## Aim
This script aims to get an idea of basic dask dataframe performance for 
- reading in parquet files
- counting the number of rows
- creating an index

This is run on a kubernetes cluster with 5 nodes - 3 masters (t2.micros) and 2 worker nodes (t2.large), and 1 bastion host
Possibly we should have larger master nodes, and more worker nodes. 

## Conclusion:
Dask is less performant than spark at operations like reading in, counting rows. 
I recommend using dask on smaller dataframes once spark has done the work of aggregating to get means, etc. 


In [1]:
import numpy as np
import pandas as pd
import dask
import boto3
import os
import io
import dask.dataframe as dd
import s3fs
import pyarrow

  return f(*args, **kwds)
  return f(*args, **kwds)


set up credentials in file

In [2]:
# insert your access key credentials
os.environ["AWS_ACCESS_KEY_ID"] = "xxxxxx"
os.environ["AWS_SECRET_ACCESS_KEY"] = "xxxxxx"

In [1]:
from dask.distributed import Client, progress
c = Client()
c

0,1
Client  Scheduler: tcp://lolling-bronco-dask-scheduler:8786  Dashboard: http://lolling-bronco-dask-scheduler:8787/status,Cluster  Workers: 6  Cores: 12  Memory: 50.25 GB


In [3]:
%timeit
# test readubg single pandas file to dask array
s3 = boto3.client('s3')
bucket='datathon-2018'
key='raw/MelbDatathon2018/Samp_0/ScanOnTransaction/2015/Week27/QID3530815_20180713_20515_0.txt.gz'
obj = s3.get_object(Bucket=bucket, Key=key)
df_pd = pd.read_csv(io.BytesIO(obj['Body'].read()),compression='gzip')

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.2 µs


lets see the list of parquet files

In [33]:
#create file list you wish to read from
s3 = boto3.resource('s3')
bucket_name='datathon-2018'
bucket=s3.Bucket(bucket_name)
file_list_on=[]
#!for obj in bucket.objects.filter(Prefix='raw/MelbDatathon2018/Samp_0/'):
#    if(obj.key.endswith(".txt.gz")):
#        file_list.append(obj.key)
#                               

file_list=[obj.key for obj in bucket.objects.filter(Prefix='parquet/taps/') if obj.key.endswith(".parquet")]

In [None]:
#for file in file_list:
#    print(file)

In [7]:
%%time
df.head()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 295 µs


Unnamed: 0,1|2015-07-02|2015-07-02 13:08:13|1524480|9|1222|415|10883|15084
0,3|2015-07-02|2015-07-02 07:33:58|1756270|2|109...
1,1|2015-07-02|2015-07-02 16:42:38|10560630|2|14...
2,1|2015-07-01|2015-07-01 11:07:08|11812440|2|28...
3,1|2015-07-01|2015-07-01 17:28:14|12272500|2|28...
4,1|2015-07-03|2015-07-03 18:08:59|10806820|2|22...


In [6]:
%%time
len(df) #368226

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 19.6 µs


368266

In [9]:
%%time
# tests reading single parquet file to dask dataframe 
df=dd.read_parquet('s3://datathon-2018/parquet/taps/2015_week27.parquet') # 5 seconds per file, so if reading


CPU times: user 76 ms, sys: 12 ms, total: 88 ms
Wall time: 4.99 s


In [14]:
df.head()

Unnamed: 0,index,mode,business_date,datetime,card_id,card_type,vehicle_id,parent_route,route_id,stop_id,tap
0,0,1,2015-07-02,2015-07-02 13:08:13,1524480,9,1222,415,10883,15084,on
1,1,3,2015-07-02,2015-07-02 07:33:58,1756270,2,1091,24,15296,18566,on
2,2,1,2015-07-02,2015-07-02 16:42:38,10560630,2,1469,862,10227,19824,on
3,3,1,2015-07-01,2015-07-01 11:07:08,11812440,2,2886,458,8591,21184,on
4,4,1,2015-07-01,2015-07-01 17:28:14,12272500,2,2853,670,16447,21296,on


In [10]:
%%time
len(df) #1,287,268   # 30 seconds to get the length

CPU times: user 872 ms, sys: 408 ms, total: 1.28 s
Wall time: 31.6 s


1287268

In [16]:
%%time
df['index'].count().compute() #16s (faster)


CPU times: user 820 ms, sys: 524 ms, total: 1.34 s
Wall time: 16.6 s


1287268

In [20]:
%%time
# test reading about 10 parquet files in to dask dataframe
df=dd.read_parquet('s3://datathon-2018/parquet/taps/2015_week3*.parquet') # 15 seconds - around 10 files
#len(df) # 5,666,495


CPU times: user 240 ms, sys: 4 ms, total: 244 ms
Wall time: 14.9 s


In [46]:
#%%time
#df['index'].count().compute()  # 2min 29s 

In [23]:
%%time
df=df.set_index('datetime') # this might take some time..  # 3min 33s

CPU times: user 1min 3s, sys: 10.5 s, total: 1min 14s
Wall time: 3min 33s


In [24]:
%%time
df['index'].count().compute() # might be faster now that we have indexed it? nope 4min49s

CPU times: user 1min 39s, sys: 12.6 s, total: 1min 51s
Wall time: 4min 49s


24522328

In [37]:
%%time
# test reading in all files to dask array

df=dd.read_parquet('s3://datathon-2018/parquet/taps/*.parquet') #4 min 10s


CPU times: user 2.94 s, sys: 196 ms, total: 3.14 s
Wall time: 4min 5s


In [38]:
%%time
df.head() #8s

CPU times: user 788 ms, sys: 452 ms, total: 1.24 s
Wall time: 16.7 s


Unnamed: 0,index,mode,business_date,datetime,card_id,card_type,vehicle_id,parent_route,route_id,stop_id,tap
0,0,1,2015-07-02,2015-07-02 13:08:13,1524480,9,1222,415,10883,15084,on
1,1,3,2015-07-02,2015-07-02 07:33:58,1756270,2,1091,24,15296,18566,on
2,2,1,2015-07-02,2015-07-02 16:42:38,10560630,2,1469,862,10227,19824,on
3,3,1,2015-07-01,2015-07-01 11:07:08,11812440,2,2886,458,8591,21184,on
4,4,1,2015-07-01,2015-07-01 17:28:14,12272500,2,2853,670,16447,21296,on


In [43]:
#%%timeit
#len(df) #timedout

In [45]:
#%%time
#df=df.groupby(df.card_type).size().compute()

In [None]:
# persist dataframe to cluster 
df = df.persist()
progress(df)