<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

DataFrames on a Cluster
=======================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="30%"
     alt="Pandas logo">



This notebook needs the [gcsfs library](https://gcsfs.readthedocs.io).

    pip install gcsfs

## Read single dataframe from the cloud with Pandas

In [1]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem(token='anon')

gcs.ls('anaconda-public-data/nyc-taxi/csv/2015/')

['anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-01.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-02.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-03.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-04.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-05.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/green_tripdata_2015-06.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-02.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-03.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-04.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-05.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-06.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-07.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-08.csv',
 'anaconda-p

In [2]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,N,-73.974785,40.750618,1,12.0,1.0,0.5,3.25,0,0.3,17.05
1,1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,N,-73.994415,40.759109,1,14.5,0.5,0.5,2.0,0,0.3,17.8
2,1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,N,-73.95182,40.824413,2,9.5,0.5,0.5,0.0,0,0.3,10.8
3,1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,N,-74.004326,40.719986,2,3.5,0.5,0.5,0.0,0,0.3,4.8
4,1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,N,-74.004181,40.742653,2,15.0,0.5,0.5,0.0,0,0.3,16.3


## Parallelize Pandas with Dask.dataframe


In [3]:
import dask
from dask.distributed import Client, progress
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:34461  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 2.09 GB


In [4]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv',
                 storage_options={'token': 'anon'}, 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [5]:
df = df.persist()
progress(df)

VBox()


Dask DataFrames
---------------

*  Coordinate many Pandas DataFrames across a cluster
*  Faithfully implement a subset of the Pandas API
*  Use Pandas under the hood (for speed and maturity)

In [6]:
df

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
npartitions=365,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,int64,datetime64[ns],datetime64[ns],int64,float64,float64,float64,int64,object,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
pickup_longitude                float64
pickup_latitude                 float64
RateCodeID                        int64
store_and_fwd_flag               object
dropoff_longitude               float64
dropoff_latitude                float64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                      int64
improvement_surcharge           float64
total_amount                    float64
dtype: object

In [15]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,N,-73.974785,40.750618,1,12.0,1.0,0.5,3.25,0,0.3,17.05
1,1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,N,-73.994415,40.759109,1,14.5,0.5,0.5,2.0,0,0.3,17.8
2,1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,N,-73.95182,40.824413,2,9.5,0.5,0.5,0.0,0,0.3,10.8
3,1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,N,-74.004326,40.719986,2,3.5,0.5,0.5,0.0,0,0.3,4.8
4,1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,N,-74.004181,40.742653,2,15.0,0.5,0.5,0.0,0,0.3,16.3


In [9]:
%time len(df)

CPU times: user 24.6 s, sys: 6.06 s, total: 30.7 s
Wall time: 4min 1s


146112989

In [10]:
%time df.passenger_count.sum().compute()

CPU times: user 22.6 s, sys: 4.75 s, total: 27.3 s
Wall time: 3min 41s


245566747

In [5]:
# Compute average trip distance grouped by passenger count
df.groupby(df.passenger_count).trip_distance.mean().compute()

passenger_count
0     2.279183
1    15.541413
2    11.815871
3     1.620052
4     7.481066
5     3.066019
6     2.977158
9     5.459763
7     3.303054
8     3.866298
Name: trip_distance, dtype: float64

### Tip Fraction, grouped by day-of-week and hour-of-day

In [6]:
df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)]
df2['tip_fraction'] = df2.tip_amount / df2.fare_amount

In [7]:
# Group df.tpep_pickup_datetime by dayofweek and hour
dayofweek = df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean() 
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()

dayofweek, hour = dask.persist(dayofweek, hour)
progress(dayofweek, hour)

VBox()

### Plot results

This requires matplotlib to be installed

In [9]:
%matplotlib inline

In [10]:
hour.compute().plot(figsize=(10, 6), title='Tip Fraction by Hour')

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-5082cb25dd3e>", line 1, in <module>
    hour.compute().plot(figsize=(10, 6), title='Tip Fraction by Hour')
  File "/usr/local/lib/python3.6/dist-packages/dask/base.py", line 165, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/distributed/client.py", line 2587, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/usr/local/lib/python3.6/dist-packages/distributed/client.py", line 1885, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.6/dist-packages/distributed/client.py", line 767, in sync
    self.loop, func, *args, ca

KeyboardInterrupt: 