<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Distributed DataFrames and Efficiency

In the previous notebooks we discussed `dask.dataframe` and `dask.distributed`. Here we combine theme on a larger dataset, and discuss efficiency and performance tips.

We will cover the following topics:

1. Persist common intermediate results in memory with `persist`
2. Partitions and partition size
3. Using indices to improve efficiency

## Distributed Cluster

We have a distributed cluster already setup on your machines. You can connect to it by creating a new client with the scheduler address:

In [None]:
from dask.distributed import Client

client = Client('schedulers:9000')
client

## The full airline dataset

We have the full airline dataset stored on `s3`. This is the same as the one you've been working with, but includes all originating airports and a few extra columns. We change the `read_csv` call slightly to avoid the extra columns, and to only use a few of the years to reduce total memory usage to fit comfortably within our small cluster.

Dask dataframe has support for reading directly from `s3`, so we can use our `read_csv` call from before.

In [None]:
import dask.dataframe as dd

columns = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime',
           'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum',
           'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay',
           'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut',
           'Cancelled']

df = dd.read_csv('s3://dask-data/airline-data/199[0,1,2,3,4]*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Distance': float,
                        'Cancelled': bool},
                 usecols=columns,
                 storage_options=dict(anon=True))

In [None]:
df.head()

### Persist data in distributed memory

Every time we run an operation like `df[~df.Cancelled].DepDelay.max().compute()` we read through our dataset from disk.  This can be slow, especially because we're reading data from CSV.  We usually have two options to make this faster:

1.  Persist relevant data in memory, either on our computer or on a cluster
2.  Use a faster on-disk format, like HDF5 or Parquet

In this section we persist our data in memory.  On a single machine this is often done by doing a bit of pre-processing and data reduction with dask dataframe and then `compute`-ing to a Pandas dataframe and using Pandas in the future.  

```python
df = dd.read_csv(...)
df = df[df.Origin == 'LGA']  # filter down to smaller dataset
pdf = df.compute()  # convert to pandas
pdf ... # continue with familiar Pandas workflows
```

However on a distributed cluster when even our cleaned data is too large we still can't use Pandas.  In this case we ask Dask to persist data in memory with the `dask.persist` function.  This is what we'll do today.  This will help us to understand when data is lazy and when it is computing.

You can trigger computations using the persist method:

    x = x.persist()

or the dask.persist function for multiple inputs:

    x, y = dask.persist(x, y)

### Exercise

Persist the dataframe into memory.

-  After it has persisted how long does it take to compute `df[~df.Cancelled].DepDelay.count().compute()`?
-  Looking at the plots in the [diagnostic web page](http://localhost:8787/status), what is taking up most of the time? (You can over over rectangles to see what function they represent)

In [None]:
%time _ = df.Cancelled[~df.Cancelled].count().compute()

In [None]:
df = # TODO: persist dataframe in memory

In [None]:
df = df.persist()

In [None]:
df

In [None]:
%time _ = df.Cancelled[~df.Cancelled].count().compute()

### Exercise

Repeat the groupby computation from the previous notebook. What is taking all of the time?

In [None]:
# What was the average departure delay from each airport?
df[~df.Cancelled].groupby('Origin').DepDelay.mean().nlargest(10).compute()

## Partitions

One `dask.dataframe` is composed of several Pandas dataframes.  The organization of these dataframes can significantly impact performance.  In this section we discuss two common factors that commonly impact performance:

1. The number of Pandas dataframes can affect overhead.  If the dataframes are too small then Dask might spend more time deciding what to do than Pandas spends actually doing it.  Ideally computations should take 100's of milliseconds.

2. If we know how the dataframes are sorted then certain operations become much faster

### Number of partitions and partition size

When we read in our data from CSV files we got one Pandas dataframe for each day.  Look at the metadata below to determine how many partitions we have.  Each "partition" is a Pandas dataframe.

In [None]:
df.npartitions

In [None]:
df

### Exercise: How large is each partition?

Use the [.map_partitions()](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) method along with the `pandas.DataFrame.memory_usage().sum()` function to determine how many bytes each partition consumes.

In [None]:
# Your code here...


### Sorted Index column

*This section doesn't have any exercises.  Just follow along.*

Many dataframe operations like loc-indexing, groupby-apply, and joins are *much* faster on a sorted index.  For example, if we want to get data for a particular day of data it *really* helps to know where that day is, otherwise we need to search over all of our data.

The Pandas model gives us a sorted index column.  Dask.dataframe copies this model, and it remembers the min and max values of every partition's index.

By default, our data doesn't have an index.

In [None]:
df.head()

So if we search for a particular day it takes a while because it has to pass through all of the data.

In [None]:
%time df[df.Date == '1992-05-05'].compute()

However if we set the `Date` column as the index then this operation can be much much faster.

In [None]:
%%time
df = df.set_index('Date').persist()

In [None]:
df

In [None]:
%time df.loc['1992-05-05'].compute()

If you look at the resulting graph, you can see that dask was able to optimize the computation to only look at a single partition:

In [None]:
df.loc['1992-05-05'].visualize(optimize_graph=True)

Additionally this lets us do traditional Pandas timeseries functionality.

In [None]:
%matplotlib inline

In [None]:
%%time 
(df.DepDelay
   .resample('1M')
   .mean()
   .fillna(method='ffill')
   .compute()
   .plot(figsize=(10, 5)))