
Dask Dataframes on NYC Taxi Data
================================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="30%"
     alt="Pandas logo">
     <img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

In this section we will learn how to ...

-  use Dask Dataframe to scale Pandas workloads
-  call `.compute` and `.persist` to trigger computation
-  start and scale a Dask cluster on Kubernetes
-  interpret dashboard plots


In [37]:
import warnings

warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

## We have several CSV files in cloud storage
Widjaja Note: Before, there was no 1*, just 1... but that dataset would be too big.

In [38]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem()

#sorted(gcs.glob('anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv'))
sorted(gcs.glob('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-1*.csv'))

['anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-10.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-11.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-12.csv']

## Read a subset with Pandas

It's too big to fit in memory on a single machine, so we pull out the first million rows to get a first impression.

In [39]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-12.csv') as f:
    df = pd.read_csv(f, nrows=1000000, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [40]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-12-01 00:00:00,2015-12-01 00:05:16,5,0.96,-73.979942,40.765381,1,N,-73.966309,40.763088,1,5.5,0.5,0.5,1.00,0.00,0.3,7.80
1,2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.972336,40.762379,1,N,-73.993629,40.745998,1,21.5,0.0,0.5,3.34,0.00,0.3,25.64
2,2,2015-12-01 00:00:00,2015-12-01 00:00:00,1,2.62,-73.968849,40.764530,1,N,-73.974548,40.791641,1,17.0,0.0,0.5,3.56,0.00,0.3,21.36
3,1,2015-12-01 00:00:01,2015-12-01 00:05:56,1,1.20,-73.993935,40.741684,1,N,-73.997665,40.747467,1,6.5,0.5,0.5,0.20,0.00,0.3,8.00
4,1,2015-12-01 00:00:01,2015-12-01 00:09:28,2,3.00,-73.988922,40.726990,1,N,-73.975594,40.696869,2,11.0,0.5,0.5,0.00,0.00,0.3,12.30
5,1,2015-12-01 00:00:02,2015-12-01 00:16:12,1,6.30,-73.974083,40.762913,1,N,-74.012802,40.702209,1,20.5,0.5,0.5,4.35,0.00,0.3,26.15
6,2,2015-12-01 00:00:02,2015-12-01 00:02:49,6,0.63,-73.968315,40.755329,1,N,-73.962082,40.758915,1,4.0,0.5,0.5,1.06,0.00,0.3,6.36
7,2,2015-12-01 00:00:02,2015-12-01 00:08:06,2,1.91,-73.994209,40.746101,1,N,-74.004250,40.721809,1,8.0,0.5,0.5,1.86,0.00,0.3,11.16
8,2,2015-12-01 00:00:02,2015-12-01 00:17:11,1,4.50,-74.006760,40.718906,1,N,-73.989693,40.772854,1,16.5,0.5,0.5,3.56,0.00,0.3,21.36
9,2,2015-12-01 00:00:04,2015-12-01 00:10:43,2,1.42,-73.999634,40.734772,1,N,-73.989067,40.723122,1,8.5,0.5,0.5,2.45,0.00,0.3,12.25


## Investigate the subset as normal

In [41]:
# How many passengers total?
df.passenger_count.sum()

1637282

In [42]:
# The average trip distance for rides with a single passenger
df2 = df[df.passenger_count == 2]
df2.trip_distance.mean()

3.19591824947246

In [43]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean()  

passenger_count
0    2.327609
1    5.034339
2    3.195918
3    3.017428
4    3.073429
5    3.012623
6    2.996719
7    0.000000
9    0.000000
Name: trip_distance, dtype: float64

## Start a Dask Cluster

Your notebook is conveniently attached to a Kubernetes cluster, so you can start a Dask cluster using the [dask-kubernetes](https://kubernetes.dask.org/en/latest/) project.

For more information on deploying Dask on different cluster technology see [Dask's deployment documentation](https://docs.dask.org/en/latest/setup.html)

Widjaja commented out a bunch of lines here to use the Mac locally, because otherwise, I'd never run anything.

In [44]:
#from dask_kubernetes import KubeCluster
#cluster = KubeCluster(n_workers=20)
#cluster

import dask.multiprocessing
dask.config.set(scheduler='processes')

<dask.config.set at 0x7f6799618ac8>

In [45]:
from dask.distributed import Client

#client = Client(cluster)
client = Client()

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


## Create Dask dataframe around all of the data

Before we loaded only a subset of one CSV file.  Now lets use Dask dataframe to read all of the files.

For more information you can read [Dask's documentation for creating dataframes](http://docs.dask.org/en/latest/dataframe-create.html)

In [46]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-12.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()

Dask dataframes look like Pandas dataframes, and support most of the common Pandas methods.

In [47]:
df.passenger_count.sum()

dd.Scalar<series-..., dtype=int64>

## Investigate laziness and use the `.compute()` method

Note that the `df.passenger_count.sum()` computation did not yet execute.  Dask dataframes are *lazy* by default, so they only evaluate when we tell them to.

There are two ways to trigger computation:

-  `result = result.compute()`: triggers computation and stores the result into local memory as a Pandas object.  

    You should use this with *small* results that will fit into memory.
-  `result = result.persist()`: triggers computation and stores the result into distributed memory, returning another Dask dataframe object.  

    You should use this with *large* results that you want to stage in distributed memory for repeated computation.

#### *Exercise*: Run the Pandas computations above with Dask dataframe

In [12]:
# How many passengers total?
df.passenger_count.sum().compute()

245566747

In [13]:
# The average trip distance for rides with a single passenger
df.trip_distance.mean().compute()

13.137274740509211

In [48]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean().compute()

passenger_count
0    2.281714
1    6.200290
2    3.314197
3    3.082567
4    3.192805
5    3.059846
6    3.002550
9    5.909286
7    4.385000
8    1.577692
Name: trip_distance, dtype: float64

#### *Question*: When is it safe to call compute?

Recall that calling `.compute()` on a Dask DataFrame returns a Pandas result in your local memory.  This can be dangerous if the size of the result is large.  In which of the following situations is calling `.compute()` ok?

-  `df.sum()`
-  `df[df.passenger_count == 1]`
-  `df[df.passenger_count == 10]`
-  `df.groupby(df.passenger_count).trip_distance.mean()`
-  `df.groupby(df.tpep_pickup_latitute).trip_distance.mean()`

## Persist data in memory

When we started this notebook we ran the following lines to create our dataframe.

```python
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()
```

In particular, we called `df = df.persist()` to load all of the CSV data into distributed memory.  Having this data in memory made our subsequent computations fast.  

In this section we're going to reset our cluster and run the same computations, but without persisting our data in memory.  What happens to our computation times?  Why?

In [None]:
client.restart()

In [None]:
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

In [None]:
# How many passengers total?
df.passenger_count.sum().compute()  

In [None]:
# The average trip distance for rides with a single passenger
df2 = df[df.passenger_count == 1]  
df2.trip_distance.mean().compute()

In [None]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean().compute()

#### *Exercise*: What did our workers spend their time doing?

To answer this question look at the Task Stream dashboard plot.  It will tell you the activity on each core of your cluster (y-axis) over time (x-axis).  You can hover over each rectangle of this plot to determine what kind of task it was.  What kinds of tasks are most common and take up the most time?

*Extra*: if you're ahead of the group you might also want to look at the Profile dashboard plot.  You can access this by selecting the orange Dask icon on the left side of your JupyterLab page.  The profile plot is an interactive [Flame graph](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html)

In [None]:
df = df.persist()  # we persist our data again, just to make future sections faster

## Dask DataFrame Design

We briefly discuss the design of Dask dataframes.  Then we follow this section with exercises that dive into this design.

<img src="http://docs.dask.org/en/latest/_images/dask-dataframe.svg"
     width="50%">
     
Dask dataframes are composed of many *partitions*, split along the index.  Each partition is a Pandas dataframe or Series.  You can see the number of partitions in the rendering of a Dask Dataframe.

In [None]:
df

And the type of each partition using the `map_partitions` method.

In [None]:
type(df)

In [49]:
df.map_partitions(type).compute()

0     <class 'pandas.core.frame.DataFrame'>
1     <class 'pandas.core.frame.DataFrame'>
2     <class 'pandas.core.frame.DataFrame'>
3     <class 'pandas.core.frame.DataFrame'>
4     <class 'pandas.core.frame.DataFrame'>
5     <class 'pandas.core.frame.DataFrame'>
6     <class 'pandas.core.frame.DataFrame'>
7     <class 'pandas.core.frame.DataFrame'>
8     <class 'pandas.core.frame.DataFrame'>
9     <class 'pandas.core.frame.DataFrame'>
10    <class 'pandas.core.frame.DataFrame'>
11    <class 'pandas.core.frame.DataFrame'>
12    <class 'pandas.core.frame.DataFrame'>
13    <class 'pandas.core.frame.DataFrame'>
14    <class 'pandas.core.frame.DataFrame'>
15    <class 'pandas.core.frame.DataFrame'>
16    <class 'pandas.core.frame.DataFrame'>
17    <class 'pandas.core.frame.DataFrame'>
18    <class 'pandas.core.frame.DataFrame'>
19    <class 'pandas.core.frame.DataFrame'>
20    <class 'pandas.core.frame.DataFrame'>
21    <class 'pandas.core.frame.DataFrame'>
22    <class 'pandas.core.frame.

### Divisions and the Index

Just like Pandas, Dask Dataframe has an *index*, a special column that indexes the rows of our dataframe.  In Dask this index has an additional purpose, it serves as a sorted partitioning of our data.  This makes some algorithms more efficient.  In this section, we'll sort our data by time and dive into the index a bit more deeply.

First, notice that our index is not particularly informative.  This is common when you load a dataset from CSV data, which generally doesn't store index or sorting information.

Lets set a new index to be the pickup time.  Sorting in parallel is hard, so this is an expensive operation.

In [15]:
df2 = df.set_index('tpep_pickup_datetime').persist()

In [16]:
df2

Unnamed: 0_level_0,VendorID,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
npartitions=365,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
2015-01-01 00:00:00.000000000,int64,datetime64[ns],int64,float64,float64,float64,int64,object,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64
2015-01-02 02:01:13.000000000,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-30 20:23:31.450503680,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-12-31 23:59:59.000000000,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
df2.head()

In [None]:
df2.tail()

Our dataframe is split into roughly as many partitions as before, but now we know the time range of each partition.  Internally, the divisions between partitions is stored in the divisions attribute.

In [None]:
df2.divisions

### Question: What took up the most time in the operation above?

What colors are most prominent in the task stream plot?

When you hover over some of these bars, what do they say?

### Fast operations along the index

Having a sorted dataframe allows for fast operations, like random access lookup and timeseries operations.

In [None]:
df2.loc['2015-05-05'].compute()  # pick out one day of data

In [None]:
df2.passenger_count.resample('1h').mean().compute().plot()

### Exercises if you are done early

Explore timeseries operations like `resample` and `rolling`

You may want to look at the [DataFrame API](http://docs.dask.org/en/latest/dataframe-api.html)

## Close things when you're done

Before you move onto the next notebook, please close down your current cluster.

Alternatively, you can restart this notebook by pressing the `"0"` key twice

In [None]:
cluster.close();