# CSV to Parquet Conversion

Parquet has a much smaller footprint allowing you to reduce storage space and improve performance when loading the data to memory. This transformation is particularly important for out-of-memory computation with increased IO for large datasets. Parquet is column-based storage, making column-based operations particularly effective. 

## Initialization

In [1]:
import dask.dataframe as dd
import numpy as np
import dask.delayed
import pyarrow

In [2]:
from pathlib import PurePath

input_directory = "../data/"
filename = '2018_Yellow_Taxi_Trip_Data'
extension = '.csv'
csv_sep = ','
input_file = PurePath(input_directory, filename + extension)

output_directory = PurePath(input_directory, filename)
output_filename_base = filename

## Start local Dask client

In [3]:
from dask.distributed import Client, LocalCluster
try:
    if client:
        print('Restarting client')
        client.restart()
except:
#     cluster = LocalCluster(dashboard_address=':20100', memory_limit='4G')
    cluster = LocalCluster(dashboard_address=':20100')
    print('Setting new client')
    client = Client(cluster)
    print(client)
client

Setting new client
<Client: 'tcp://127.0.0.1:44203' processes=5 threads=10, memory=16.39 GB>


0,1
Client  Scheduler: tcp://127.0.0.1:44203  Dashboard: http://127.0.0.1:20100/status,Cluster  Workers: 5  Cores: 10  Memory: 16.39 GB


## Get all available columns

In [4]:
ddf = dd.read_csv(input_file, sep=csv_sep)
columns = ddf.columns.values
for i, column in enumerate(columns):
    print(str(i) + ': ' + column)

0: VendorID
1: tpep_pickup_datetime
2: tpep_dropoff_datetime
3: passenger_count
4: trip_distance
5: RatecodeID
6: store_and_fwd_flag
7: PULocationID
8: DOLocationID
9: payment_type
10: fare_amount
11: extra
12: mta_tax
13: tip_amount
14: tolls_amount
15: improvement_surcharge
16: total_amount


## Data Interface

In [5]:
categorical_features = ['VendorID', 
                        'RatecodeID', 
                        'PULocationID',
                        'DOLocationID',
                        'payment_type',
                        'store_and_fwd_flag'
                       ]
datetime_features = ['tpep_pickup_datetime',
                    'tpep_dropoff_datetime']

numerical_features = ['passenger_count', 
                     'trip_distance', 
                     'fare_amount', 
                      'extra',
                      'mta_tax',
                      'tip_amount',
                      'tolls_amount',
                      'improvement_surcharge',
                      'total_amount'
                     ]

# Type dict to improve dynamic loading of csv
dtypes = {**{col: 'category' for col in categorical_features}, \
         **{col: 'float64' for col in numerical_features}}

In [6]:
columns_to_load = categorical_features + datetime_features + numerical_features 

In [7]:
ddf = dd.read_csv(input_file, 
                  usecols = columns_to_load, 
                  dtype=dtypes, 
                  sep=csv_sep, 
                  parse_dates = datetime_features)
#                 , 
#                   blocksize = 32e6)

## Write parquet file in modified format

In [8]:
def create_parquet_file(ddf, output_filepath):
    dd.to_parquet(ddf, output_filepath)

In [None]:
%time create_parquet_file(ddf, output_directory)

distributed.utils - ERROR - 'start'
Traceback (most recent call last):
  File "/home/justin/.conda/envs/ml_env/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/home/justin/.conda/envs/ml_env/lib/python3.7/site-packages/distributed/dashboard/components/shared.py", line 312, in update
    ts = metadata["keys"][self.key]
KeyError: 'start'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fa388914690>>, <Future finished exception=KeyError('start')>)
Traceback (most recent call last):
  File "/home/justin/.conda/envs/ml_env/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/home/justin/.conda/envs/ml_env/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/home/justin/.conda/envs/ml_env/lib/python3.7/site-packages/tornado/ge

## Test performance in Dask file reading for both file types

In [21]:
import fastparquet
import dask.dataframe as ddf
# output_directory = PurePath('datetime_yellow_taxi_parquet')
test_ddf = dd.read_parquet(output_directory)
# print(test_ddf['tpep_pickup_datetime'].head())
# output_directory

## Parquet vs CSV reading speed test

In [10]:
csv_ddf = dd.read_csv(input_file, usecols = columns_to_load, dtype=dtypes, sep=csv_sep)
pq_ddf = dd.read_parquet(output_directory, usecols = columns_to_load, dtype=dtypes)

In [11]:
def average_result(ddf):
    # Have to transform to float64 to prevent overflow and inf outcomes
    ddf['trip_distance'] = ddf['trip_distance'].astype(np.float64)
    return ddf.groupby('VendorID')['trip_distance'].mean()

### CSV 

In [12]:
# File size is ~ 10Gb
%time result = average_result(csv_ddf).compute()    
print(result)

CPU times: user 5.59 s, sys: 565 ms, total: 6.16 s
Wall time: 1min 26s
VendorID
1    2.791033
2    3.031729
4    2.703563
Name: trip_distance, dtype: float64


### Parquet

In [13]:
# Parquet directory size ~ 2.9G
%time result = average_result(pq_ddf).compute()    
print(result)

KeyboardInterrupt: 

VendorID
1    2.791033
2    3.031729
4    2.703563
Name: trip_distance, dtype: float64


In [14]:
### Parquet
import dask.dataframe as dd
ddf_reduced_float = dd.read_parquet('../data/2018_Yellow_Taxi_Trip_Data')
ddf_full_float = dd.read_parquet('../data/2018_Yellow_Taxi_Trip_Data_float64')

In [15]:
ddf_reduced_float.dtypes

VendorID                       category
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                     category
store_and_fwd_flag             category
PULocationID                   category
DOLocationID                   category
payment_type                   category
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
dtype: object

In [16]:
ddf_full_float.dtypes

VendorID                       category
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                     category
store_and_fwd_flag               object
PULocationID                   category
DOLocationID                   category
payment_type                   category
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
dtype: object



Interstingly, the reduced float size seems to increase file size (more than 2x). Possibly not worth doing. Lets check memory footprint

In [None]:
df_reduced_float = ddf_reduced_float.get_partition(0)
df_reduced_float.info(memory_usage='deep')

In [None]:
df_full_float = ddf_full_float.get_partition(0)
df_full_float.info(memory_usage='deep')

In [None]:
%time result = average_result(ddf_full_float).compute()    
print(result)