Write NYCTaxi to Parquet
=================

In [1]:
from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')
c

<Client: scheduler="127.0.0.1:8786" processes=56 cores=56>

In [2]:
import dask.dataframe as dd

## Read CSV

In [3]:
import dask.dataframe as dd
dtype = {'VendorID': 'category', 'passenger_count': 'uint8', 
         'RateCodeID': 'uint8', 'payment_type': 'uint8',
         'store_and_fwd_flag': 'category', 'payment_type': 'category',
         'trip_distance': 'float32', 'fare_amount': 'float32',
         'extra': 'float32', 'mta_tax': 'float32',
         'tip_amount': 'float32', 'tolls_amount': 'float32',
         'improvement_surcharge': 'float32', 'total_amount': 'float32'}

df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', dtype=dtype,
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'anon': True}) #, blocksize=400*2**20)


In [4]:
df = c.persist(df)

## Index by datetime column

In [5]:
df = df.set_index('tpep_pickup_datetime')
df = c.persist(df)

In [None]:
# lags before starting, perhaps to fetch the first block.
%time df.passenger_count.sum()

In [None]:
df.dtypes

In [None]:
df.head()

## Write to Parquet

In [None]:
from dask.dataframe.io.parquet import to_parquet

In [None]:
import s3fs
fs = s3fs.S3FileSystem()

#if fs.exists('dask-data/nyc-taxi/2015/parquet'):
#    fs.rm('dask-data/nyc-taxi/2015/parquet', recursive=True)

In [None]:
to_parquet('s3://dask-data/nyc-taxi/2015/parquet', df, has_nulls=False, object_encoding='utf8')

In [None]:
del df

## Read from Parquet

In [None]:
import s3fs
fs = s3fs.S3FileSystem()
plain = fs.du('dask-data/nyc-taxi/2015/parquet', deep=True, total=True) / 2**30
gzip = fs.du('dask-data/nyc-taxi/2015/parquet.gz', deep=True, total=True) / 2**30
plain, gzip, plain/gzip

In [None]:
import fastparquet
pf = fastparquet.ParquetFile('dask-data/nyc-taxi/2015/parquet', open_with=fs.open)
pf.dtypes

In [None]:
pf.info

In [None]:
# raw download speed
%%time
with fs.open('dask-data/nyc-taxi/2015/parquet/part.1.parquet', 'rb') as f:
    print(len(f.read()) / 2**20)

In [None]:
from dask.dataframe.io.parquet import read_parquet
df3 = read_parquet('s3://dask-data/nyc-taxi/2015/parquet.gz', index='tpep_pickup_datetime',
                  categories=['VendorID', 'payment_type', 'store_and_fwd_flag'])

In [None]:
%%time
df3.head()

In [None]:
%%time
len(df3)

In [None]:
%%time
len(df3.passenger_count)

In [None]:
%%time
df3.passenger_count.sum().compute()

In [None]:
%%prun -D out.stats
import dask
with dask.set_options(get=dask.async.get_sync):
    df3.passenger_count.sum().compute()

In [None]:
c.restart()