# Bquery/Bcolz Taxi Set Performance

Based on the great work by Matthew Rocklin, see http://matthewrocklin.com/blog/work/2016/02/22/dask-distributed-part-2

NB: The auto-caching features will make the second (and subsequent) runs faster for multi-column groupings, which is reflected in the scores below.

In [1]:
import os
import urllib
import glob
import pandas as pd
from bquery import ctable
import bquery
import bcolz
from multiprocessing import Pool, cpu_count
from collections import OrderedDict
import contextlib
import time
import load
from pathlib import Path
import dask.dataframe as dd

# do not forget to install numexpr
# os.environ["BLOSC_NOLOCK"] = "1"
bcolz.set_nthreads(1)
workdir = os.path.join(Path.home(), "Documents/taxi")

In [2]:
# create dir
if not os.path.exists(workdir):
    os.makedirs(workdir)
csv_files = load.download_data(workdir, year_list=[2015], month_list=[1, 2])

url: https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-01.csv
output file: /home/ylebon/Documents/taxi/yellow_tripdata_2015-01.csv
url: https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-02.csv
output file: /home/ylebon/Documents/taxi/yellow_tripdata_2015-02.csv


In [3]:
elapsed_times = OrderedDict()

@contextlib.contextmanager
def ctime(message=None):
    "Counts the time spent in some context"
    assert message is not None
    global elapsed_times
    t_elapsed = 0.0
    print('\n')
    t = time.time()
    yield
    if message:
        print (message + ":  ") 
    t_elapsed = time.time() - t
    print (round(t_elapsed, 4), "sec")
    elapsed_times[message] = t_elapsed

In [4]:
# create bcolz
with ctime(message='Generating bcolz from CSV files'):
    create_bcolz=False
    if create_bcolz:
        load.create_bcolz(workdir)
    bcolz_workdir = os.path.join(workdir, 'bcolz')



Generating bcolz from CSV files:  
0.0 sec


In [5]:
# create bcolz chunks
with ctime(message='Generating bcolz chunks from CSV files'):
    create_bcolz_chunks=False
    if create_bcolz_chunks:
        load.create_bcolz_chunks(workdir)
bcolz_chunks_workdir = os.path.join(workdir, 'bcolz_chunks')
ct_list = glob.glob("{0}/*".format(bcolz_chunks_workdir))
ct_list



Generating bcolz chunks from CSV files:  
0.0 sec


['/home/ylebon/Documents/taxi/bcolz_chunks/taxi_0',
 '/home/ylebon/Documents/taxi/bcolz_chunks/taxi_1']

In [6]:
# create parquet
with ctime(message='Generating parquet partitions from CSV files'):
    create_parquet=True
    if create_parquet:
        outputdir_list = load.create_parquet(workdir)




csv search pattern: /home/ylebon/Documents/taxi/yellow_tripdata_*.csv
loading: /home/ylebon/Documents/taxi/yellow_tripdata_2015-01.csv
loading: /home/ylebon/Documents/taxi/yellow_tripdata_2015-02.csv
Generating parquet partitions from CSV files:  
318.3448 sec


In [7]:
def sub_query(input_args):
    rootdir = input_args['rootdir']
    group_cols = input_args['group_cols']
    measure_cols = input_args['measure_cols']
    ct = ctable(rootdir=rootdir, mode='a')
    result = ct.groupby(group_cols, measure_cols)
    result_df = result.todataframe()
    return result_df.to_msgpack()


def execute_query(ct_list, group_cols, measure_cols):
    p = Pool(cpu_count())
    query_list = [{
            'rootdir': rootdir, 
            'group_cols': group_cols,
            'measure_cols': measure_cols} for rootdir in ct_list]
    result_list = p.map(sub_query, query_list)
    p.close()
    result_list = [pd.read_msgpack(x) for x in result_list]
    result_df = pd.concat(result_list, ignore_index=True)
    result_df = result_df.groupby(group_cols)[measure_cols].sum()
    return result_df


In [8]:
ct = ctable(rootdir=bcolz_workdir, mode='a')
measure_list = ['extra',
                'fare_amount',
                'improvement_surcharge',
                'mta_tax',
                'nr_rides',
                'passenger_count',
                'tip_amount',
                'tolls_amount',
                'total_amount',
                'trip_distance']

## BQUERY: Single Process

In [13]:
with ctime(message='CT payment_type nr_rides sum, single process'):
    r = ct.groupby(['payment_type'], ['nr_rides'])
    print("result:")
    print(r)
    
with ctime(message='CT yearmonth nr_rides sum, single process'):
    r = ct.groupby(['pickup_yearmonth'], ['nr_rides'])
    print("result:")
    print(r)

with ctime(message='CT yearmonth + payment_type nr_rides sum, single process'):
    ct.groupby(['pickup_yearmonth', 'payment_type'], ['nr_rides'])
    print("result:")
    print(r)



result:
[(1, 15866142) (2, 9230837) (3, 77953) (4, 24568) (5, 7)]
CT payment_type nr_rides sum, single process:  
2.0781 sec


result:
[(201501, 12748986) (201502, 12450521)]
CT yearmonth nr_rides sum, single process:  
1.9324 sec


result:
[(201501, 12748986) (201502, 12450521)]
CT yearmonth + payment_type nr_rides sum, single process:  
3.6863 sec


## DASK: Single Process

In [15]:
#parquet_files = glob.glob("{0}/parquet".format(workdir), recursive=True)
ddf = dd.concat([dd.read_parquet(x, engine='fastparquet') for x in outputdir_list])

with ctime(message='CT payment_type nr_rides sum, single process'):
    r = ddf.groupby('payment_type')['nr_rides'].sum().compute()
    print("result:")
    print(r)
    
with ctime(message='CT yearmonth nr_rides sum, single process'):
    r = ddf.groupby('pickup_yearmonth')['nr_rides'].sum().compute()
    print("result:")
    print(r)

with ctime(message='CT yearmonth + payment_type nr_rides sum, single process'):
    r = ddf.groupby(['pickup_yearmonth', 'payment_type'])['nr_rides'].sum().compute()
    print("result:")
    print(r)



result:
payment_type
1    15866142
2     9230837
3       77953
4       24568
5           7
Name: nr_rides, dtype: int64
CT payment_type nr_rides sum, single process:  
5.579 sec


result:
pickup_yearmonth
201501    12748986
201502    12450521
Name: nr_rides, dtype: int64
CT yearmonth nr_rides sum, single process:  
5.4784 sec


result:
pickup_yearmonth  payment_type
201501            1               7881388
                  2               4816992
                  3                 38632
                  4                 11972
                  5                     2
201502            1               7984754
                  2               4413845
                  3                 39321
                  4                 12596
                  5                     5
Name: nr_rides, dtype: int64
CT yearmonth + payment_type nr_rides sum, single process:  
5.861 sec


## BQUERY: Multi Process

In [11]:
with ctime(message='CT payment_type nr_rides sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['payment_type'], ['nr_rides'])

with ctime(message='CT yearmonth nr_rides sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['pickup_yearmonth'], ['nr_rides'])

with ctime(message='CT yearmonth + payment_type nr_rides sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['pickup_yearmonth', 'payment_type'], ['nr_rides'])





AttributeError: type object 'DataFrame' has no attribute 'from_items'

## BQUERY: Single Process, All Measures

In [None]:
with ctime(message='CT payment_type all measure sum, single process'):
    ct.groupby(['payment_type'], measure_list)

with ctime(message='CT yearmonth all measure sum, single process'):
    ct.groupby(['pickup_yearmonth'], measure_list)

with ctime(message='CT yearmonth + payment_type all measure sum, single process'):
    ct.groupby(['pickup_yearmonth', 'payment_type'], measure_list)

## BQUERY: Multi Process, All Measures

In [None]:
with ctime(message='CT payment_type all measure sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['payment_type'], measure_list)

with ctime(message='CT yearmonth  all measure sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['pickup_yearmonth'], measure_list)

with ctime(message='CT yearmonth + payment_type  all measure sum, ' + str(cpu_count()) + ' processors'):
    execute_query(ct_list, ['pickup_yearmonth', 'payment_type'], measure_list)