<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

DataFrames on a Cluster
=======================

<img src="http://www.numfocus.org/uploads/6/0/6/9/60696727/6893890_orig.png"
     align="left"
     width="30%"
     alt="Pandas logo">


### Read single dataframe from S3 with Pandas

In [None]:
from s3fs import S3FileSystem

s3 = S3FileSystem()
s3.ls('dask-data/nyc-taxi/2015/')

In [None]:
with s3.open('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv') as f:
    print(f.read(1000))

In [None]:
import pandas as pd

with s3.open('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

### Parallelize Pandas with Dask.dataframe


In [None]:
from dask.distributed import Executor, progress
e = Executor('127.0.0.1:8786')
e

In [None]:
import dask
dask.set_options(get=e.get)  # use distributed cluster by default

In [None]:
import dask.dataframe as dd


df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [None]:
df = e.persist(df)
progress(df)


Dask DataFrames
---------------

*  Coordinate many Pandas DataFrames across a cluster
*  Faithfully implement a subset of the Pandas API
*  Use Pandas under the hood (for speed and maturity)

In [None]:
df

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
%time len(df)

In [None]:
%time df.passenger_count.sum().compute()

In [None]:
# Compute average trip distance grouped by passenger count
df.groupby(df.passenger_count).trip_distance.mean().compute()

### Tip Fraction, grouped by day-of-week and hour-of-day

In [None]:
df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)]
df2 = df2.assign(tip_fraction=df2.tip_amount / df2.fare_amount)

In [None]:
# Group df.tpep_pickup_datetime by dayofweek and hour
dayofweek = df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean() 
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()

dayofweek, hour = e.persist([dayofweek, hour])
progress(dayofweek, hour)

In [None]:
dayofweek.compute()

In [None]:
hour.compute()

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt

plt.figure(figsize=(12, 4))
plt.plot(hour.compute().values, '.-')
plt.ylim(ymin=0);
plt.xlabel("Hour of day")
plt.ylabel("Tip Fraction")