# Distributed DataFrames

In [None]:
%matplotlib inline

import pandas as pd
import dask.dataframe as dd

import matplotlib.pyplot as plt

## NYC Taxi data
In this notebook we use distributed dataframes to analyze NYC Taxi data (https://data.cityofnewyork.us/view/ba8s-jw6u, http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml)

This data is stored as large CSV files on S3 in a public bucket (https://s3.amazonaws.com/nyc-tlc/trip+data/). We could load them using the `s3fs` library:

    >>> from s3fs import S3FileSystem
    >>> s3 = S3FileSystem(anon=True)

    >>> s3.ls('nyc-tlc/trip data/')
    [...
     'nyc-tlc/trip data/yellow_tripdata_2009-01.csv',
     ...
     'nyc-tlc/trip data/yellow_tripdata_2015-01.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-02.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-03.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-04.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-05.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-06.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-07.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-08.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-09.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-10.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-11.csv',
     'nyc-tlc/trip data/yellow_tripdata_2015-12.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-01.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-02.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-03.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-04.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-05.csv',
     'nyc-tlc/trip data/yellow_tripdata_2016-06.csv']
    

To work with these, we did download some of them and put them on a data share. You can copy them to ``$VSC_SCRATCH_NODE`` (`/local`).

We would like to load this data with Pandas, but there is too much data here to fit in memory.

In [None]:
pd.read_csv("/local/yellow_tripdata_2015-01.csv", nrows=5)

Instead, we connect to the cluster and use dask.dataframe to load the CSV data into ~700 Pandas dataframes spread across our cluster.  We get back a Dask.dataframe to coordinate these small Pandas dataframes.

## Setting up a cluster

A `dask.distributed` network consists of one `Scheduler` node and several `Worker` nodes. You connect to these with a `Client`. One can set these up in a variety of ways (https://distributed.readthedocs.io/en/latest/setup.html).

If you create an client without providing an address it will start up a local scheduler and worker for you:

    >>> from distributed import Client
    >>> client = Client()
    >>> client
    <Client: scheduler="127.0.0.1:8786" processes=8 cores=8>
    
You can also set up a network on the HPC cluster, and connect to it using the Scheduler's address:

    >>> client = Client("10.141.18.78:8786")

In [None]:
from distributed import Client, progress

In [None]:
client = Client()

In [None]:
client

In [None]:
df = dd.read_csv("/local/yellow_tripdata_2015-01.csv",
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [None]:
df.head()

## Web interface

Information about the current state of the network helps to track progress, identify performance issues, and debug failures.

Dask.distributed includes a web interface to help deliver this information over a normal web page in real time. This web interface is launched by default wherever the scheduler is launched if the scheduler machine has Bokeh installed. The web interface is normally available at http://scheduler-address:8787/status/ and can be viewed any normal web browser.

Connect over ssh to the 8787 port on the node you are working on, and see the web interface on: http://127.0.0.1:8787/status/

In [None]:
df = client.persist(df)
progress(df)

In [None]:
len(df)

In [None]:
df.passenger_count.sum().compute()

In [None]:
df

In [None]:
df.head()

In [None]:
df.info(memory_usage=True)

### Play

Existing Pandas experience transfers over decently well to Dask.dataframe.  However there are a few caveats when dealing with distributed systems:

*  Until you call `client.persist` (for large results) or `client.compute` (for small results), all computations are lazy
*  Call `progress` on a dataframe *after* you persist to track the progress of a computation.  You can continue doing work immediately.  All work happens in the background.
*  If you are computing a small result, just add `.compute()` to the end of your result, like `df.passenger_count.sum().compute()`.  This will block and return the result when finished.

### Example

In [None]:
positive_fares = df[df.fare_amount > 0]
fares = df[['fare_amount', 'tip_amount', 'payment_type']]

fares = client.persist(fares)  # triggers computation
progress(fares)

In [None]:
fares.head()

In [None]:
(fares.tip_amount == 0).sum().compute()

In [None]:
fares.count().compute()

In [None]:
df.passenger_count.sum().compute()

<div class="alert alert-success">
    <b>EXERCISE</b>: 
    
Compute the following using `.compute()`:

<ul>
<li>The mean of the passenger count column</li>
<li>The mean trip distance grouped by passenger count</li>
</ul>

<p>Create a new dataframe that filters out all the rides greater than three miles, then compute the above quantities again.  </p>
</div>

## Setting the time as index

Use the `.set_index` method to set the index to the `tpep_pickup_datetime` column.  This is an *expensive* operation, so call `client.persist` on the result to create a new dataframe that is persisted in distributed memory.  Use the `progress` function to track the progress.

Once this finishes you have access to datetime functionality like `loc`, `resample` and `rolling` aggregations.  

In [None]:
df2 = df.set_index('tpep_pickup_datetime')

In [None]:
df2 = client.persist(df2)
progress(df2)

The dask dataframe now has divisions, making certain operations much more efficient.

In [None]:
df.divisions

In [None]:
df2.divisions

<div class="alert alert-success">
    <b>EXERCISE</b>: Plot the hourly number of taxi trips for Jan 1 to Jan 5
</div>

<div class="alert alert-success">
    <b>EXERCISE</b>: Plot daily profile of number of taxi trips
</div>

<div class="alert alert-success">
    <b>EXERCISE</b>: Plot the daily tip average
</div>

<div class="alert alert-success">
    <b>EXERCISE</b>: Based on the previous result: Is there a week pattern within the avarage tip amount?
</div>

<div class="alert alert-success">
    <b>EXERCISE</b>: For those trips were a tip has been given, calculate the tip fraction (tip_amount / fare_amount) and calculate the daily profile of this fraction.
</div>

## More data

Up to now we performed the analyses on one of the csv files: all data for January 2015. You can repeat the notebook with multiple of the files:  

    df = dd.read_csv("/local/yellow_tripdata_2015-*.csv",
                     parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])