# Finally Working: [Dask and Castra](http://blaze.pydata.org/blog/2015/09/08/reddit-comments/)

**This cell shows the fix for the `metadata.json` file:** basically, we use the library `ast` for properly encoding the JSON file, this means a slower encoding but anyway it works fine.

In [32]:
import gzip
import ast

metadata_gz = "C:\\dtu\\ctbd\\amazon_dataset\\metadata.json.gz"
fivecore = 'C:\\dtu\\ctbd\\amazon_dataset\\kcore_5.json.gz'

with gzip.open(metadata_gz, 'rb') as f:
    for i,l in enumerate(f):
        try:
            k = ast.literal_eval(l)
        except ValueError, e: 
            print '\nERR\n',e,'\n' , l
            break
        if i % 1e5 == 0 and i>0:
            print i,
        
        #Just not to process the whole file, we stop the computation after 1.000.000 lines
        if i % 1e6 == 0 and i>0:
            break

print i
print "Done"

100000 200000 300000 400000 500000 600000 700000 800000 900000 1000000 1000000
Done


Now, the final version of the working implementation, step by step:

All the **imports**:

In [1]:
import ujson
import gzip
import ast
import pandas as pd
from pandas import DataFrame
from toolz import dissoc
from toolz import dissoc, partition_all
from castra import Castra
import time
import datetime
import dask.dataframe as dd
import dask.bag as db
from dask.diagnostics import ProgressBar
import numpy as np

Initializing variables like **paths**, **column names** and **chunk size**:

In [2]:
path_to = "C:\\dtu\\ctbd\\amazon_dataset\\"
f = 'Musical_Instruments_5.json'
reviews = "kcore_5.json.gz"
metadata = "metadata.json.gz"
reviews_columns = [('asin',np.dtype(np.string_)),
                   ('reviewerID',np.dtype(np.string_)),
                   ('reviewerName',np.dtype(np.string_)),
                   ('overall',np.dtype(float)),
                   ('summary',np.dtype(np.string_)),
                   ('reviewText',np.dtype(np.string_)),
                   ('reviewTime',np.dtype(np.string_)),
                   ('unixReviewTime',np.dtype(int))]
metadata_columns = ['asin','title','price','imUrl','related','also_bought','also_viewed','bought_together','salesRank','brand','categories']
chunksize = 5000

Implementation of the user defined **functions** we used in the script:

In [3]:
#Convert a line of JSON into a cleaned up dict.
def to_json(line):
    return ujson.loads(line)

#Convert a not proper line of JSON (due to single quotes) into a cleaned up dict.
def fix_json(line):
    return ast.literal_eval(line)

#Convert a list of JSON strings into a DataFrame
def to_df(batch,filename):
    if filename == 'metadata':
        blobs = map(fix_json,batch)
        df = DataFrame.from_records(blobs, columns=metadata_columns)
    else:
        blobs = map(to_json, batch)
        df = DataFrame(blobs)
    print df.dtypes
    return df

#Create the castra dataset for improved I/O operations with Dask DataFrames
#We can work properly on compressed GZ files with gzip library
#The chunk size is 5000, which means that 5000 lines per time will be processed
def create_castra(fullpath,chunksize):
    win_filename = fullpath.split('\\')[-1].split('.')[0] #Used later
    mac_filename = fullpath.split('/')[-1].split('.')[0]
    filename = win_filename
    with open(fullpath,'r') as f:
        batches = partition_all(chunksize, f)
        castra = None
        for batch in batches:
            df = to_df(batch,filename)
            if castra == None:
                castra = Castra(path_to+filename+'.castra', template=df)
            castra.extend(df)

**IMPORTANT!!! DON'T RUN TWICE!** | **Execution of the script**: this may take a while... 

In [8]:
print 'Starting the creation of the Castra files...'

#Creating the castra file for metadata
print 'Processing compressed metadata...'
start = time.time()
create_castra(path_to+metadata,chunksize)
end = time.time()
print "Done! Metadata processed in:",datetime.timedelta(seconds=(end-start))

Starting the creation of the Castra files...
Processing compressed metadata...
Done! Metadata processed in: 1:17:59.844894


In [4]:
#Creating the castra file for the 5_cores
print 'Processing compressed 5_cores...'
start = time.time()
create_castra(path_to+f,chunksize)
end = time.time()
print "Done! Reviews data processed in:",datetime.timedelta(seconds=(end-start))

Processing compressed 5_cores...
asin               object
helpful            object
overall           float64
reviewText         object
reviewTime         object
reviewerID         object
reviewerName       object
summary            object
unixReviewTime      int64
dtype: object
asin               object
helpful            object
overall           float64
reviewText         object
reviewTime         object
reviewerID         object
reviewerName       object
summary            object
unixReviewTime      int64
dtype: object
asin               object
helpful            object
overall           float64
reviewText         object
reviewTime         object
reviewerID         object
reviewerName       object
summary            object
unixReviewTime      int64
dtype: object
Done! Reviews data processed in: 0:00:00.329000


After the creation of the castra files, we can start playing with the Dask DataFrames:

In [5]:
# Start a progress bar for all computations
pbar = ProgressBar(minimum=3.0,dt=0.5)
pbar.register()

# Load data into a dask dataframe:
path_to_castra = 'C:\\dtu\\ctbd\\amazon_dataset\\Musical_Instruments_5.castra'
df = dd.from_castra(path_to_castra)

ValueError: Expected iterable of tuples of (name, dtype), got [u'asin', u'helpful', u'overall', u'reviewText', u'reviewTime', u'reviewerID', u'reviewerName', u'summary', u'unixReviewTime']

In [9]:
result = df.groupby('asin').mean().reset_index().compute()
result['index']

Exception: Data must be 1-dimensional

Traceback
---------
  File "C:\Users\ricky\Anaconda2\lib\site-packages\dask\async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\dask\async.py", line 249, in _execute_task
    return func(*args2)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\castra\core.py", line 281, in load_partition
    index=self.load_index(name))
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 224, in __init__
    mgr = self._init_dict(data, index, columns, dtype=dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 360, in _init_dict
    return _arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 5236, in _arrays_to_mgr
    arrays = _homogenize(arrays, index, dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 5546, in _homogenize
    raise_cast_failure=False)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\series.py", line 2920, in _sanitize_array
    raise Exception('Data must be 1-dimensional')


In [23]:
df.count().compute()

Exception: Data must be 1-dimensional

Traceback
---------
  File "C:\Users\ricky\Anaconda2\lib\site-packages\dask\async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\dask\async.py", line 249, in _execute_task
    return func(*args2)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\castra\core.py", line 281, in load_partition
    index=self.load_index(name))
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 224, in __init__
    mgr = self._init_dict(data, index, columns, dtype=dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 360, in _init_dict
    return _arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 5236, in _arrays_to_mgr
    arrays = _homogenize(arrays, index, dtype)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\frame.py", line 5546, in _homogenize
    raise_cast_failure=False)
  File "C:\Users\ricky\Anaconda2\lib\site-packages\pandas\core\series.py", line 2920, in _sanitize_array
    raise Exception('Data must be 1-dimensional')


In [22]:
df.asin.value_counts().nlargest(10).compute()

B003VWJ2K8    163
B0002E1G5C    143
B0002F7K7Y    116
B003VWKPHC    114
B0002H0A3S     93
B0002CZVXM     74
B0006NDF8A     71
B0009G1E0K     69
B0002E2KPC     68
B0002GLDQM     67
Name: asin, dtype: int64