Distributed DataFrames
======================

In this notebook we use distributed dataframes to analyze NYC Taxi data stored as CSV files on S3.

This data is stored as large CSV files on S3 in a public bucket.

This notebook uses Dask.  You may want to use [Dask's diagnostic dashboard](../../../9002/status) while running this notebook for feedback from the cluster.  We recommend setting up the dashboard and your notebook side-by-side.

In [None]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem(token='cloud')

gcs.ls('anaconda-public-data/nyc-taxi/csv/2015/')

We would like to load this data with Pandas, but there is too much data here to fit in memory.

In [None]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5)  # look at just five rows
    
df

Instead, we connect to the cluster and use dask.dataframe to load the CSV data into ~700 Pandas dataframes spread across our cluster.  We get back a Dask.dataframe to coordinate these small Pandas dataframes.

In [None]:
from dask.distributed import Client, progress

client = Client('schedulers:9000')
client

In [None]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv',
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                storage_options={'token': 'cloud'})
df

In [None]:
df = df.persist()
progress(df)

### Play

Existing Pandas experience transfers over decently well to Dask.dataframe.  However there are a few caveats when dealing with distributed systems:

*  Until you call `df = df.persist()` (for large results) or `x.compute()` (for small results), all computations are lazy
*  Call `progress` on a dataframe *after* you persist to track the progress of a computation.  You can continue doing work immediately.  All work happens in the background.
*  If you are computing a small result, just add `.compute()` to the end of your result, like `df.passenger_count.sum().compute()`.  This will block and return the result when finished.

### Example

In [None]:
positive_fares = df[df.fare_amount > 0]
fares = df[['fare_amount', 'tip_amount', 'payment_type']]

fares = fares.persist()  # triggers computation
progress(fares)

In [None]:
fares.head()

In [None]:
(fares.tip_amount == 0).sum().compute()

In [None]:
fares.count().compute()

In [None]:
df.passenger_count.sum().compute()

### Exercises

Compute the following using `.compute()`:

*  The mean of the passenger count column
*  The mean trip distance grouped by passenger count

Create a new dataframe that filters out all the rides greater than three miles, then compute the above quantities again.  


### Exercises

Use the `.set_index` method to set the index to the `tpep_pickup_datetime` column.  This is an *expensive* operation, so call `.persist()` on the result to create a new dataframe that is persisted in distributed memory.  Use the `progress` function to track the progress.

Once this finishes you have access to datetime functionality like `loc`, and `rolling` aggregations.  


### Debugging

Debugging is hard in parallel.  Errors on remote machines generate valuable exceptions and tracebacks that must be communicated back to users.  Some information is lost. 

This data has an error if you use the full dataset.  Run the following code to trigger the error.  Can you find out the problem?

In [None]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-*.csv',
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'token': 'cloud'}).persist()

progress(df)

In [None]:
df.count().compute()