<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Dask DataFrame - parallelized pandas

Looks and feels like the pandas API, but for parallel and distributed workflows. 

At its core, the `dask.dataframe` module implements a "blocked parallel" `DataFrame` object that looks and feels like the pandas API, but for parallel and distributed workflows. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrame`s separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.


<img src="https://docs.dask.org/en/stable/_images/dask-dataframe.svg"
     align="right"
     width="30%"
     alt="Dask DataFrame is composed of pandas DataFrames"/>

**Related Documentation**

* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)
* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [DataFrame examples](https://examples.dask.org/dataframe.html)
* [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)

## When to use `dask.dataframe`

pandas is great for tabular datasets that fit in memory. A general rule of thumb for pandas is:

> "Have 5 to 10 times as much RAM as the size of your dataset"
>
> ~ Wes McKinney (2017) in [10 things I hate about pandas](https://wesmckinney.com/blog/apache-arrow-pandas-internals/)

Here "size of dataset" means dataset size on _the disk_.

Dask becomes useful when the datasets exceed the above rule.

In this notebook, you will be working with the New York City Airline data. This dataset is only ~200MB, so that you can download it in a reasonable time, but `dask.dataframe` will scale to  datasets **much** larger than memory.



## Create a distributed Cluster in the cloud

Create the datasets you will be using in this notebook:

In [2]:
import coiled

cluster = coiled.Cluster(
    account="dask-tutorials",
    n_workers=20,
    shutdown_on_close=False,
    region="us-east-2",
)

client = cluster.get_client()

Output()

Output()

### Dask Diagnostic Dashboard

Dask Distributed provides a useful Dashboard to visualize the state of your cluster and computations.

If you're on **JupyterLab**, you can use the [Dask JupyterLab extension](https://github.com/dask/dask-labextension) (which should be already installed in your environment) to open the dashboard plots:
* Click on the Dask logo in the left sidebar
* Click on the magnifying glass icon, which will automatically connect to the active dashboard (if that doesn't work, you can type/paste the dashboard link http://127.0.0.1:8787 in the field)
* Click on **"Task Stream"**, **"Progress Bar"**, and **"Worker Memory"**, which will open these plots in new tabs
* Re-organize the tabs to suit your workflow!

Alternatively, click on the dashboard link displayed in the Client details above: http://127.0.0.1:8787/status. It will open a new browser tab with the Dashboard.

## NYC Uber/Lyft rides

The NYC Taxi dataset is a timeless classic.  

Interestingly there is a new variant.  The NYC Taxi and Livery Commission requires data from all ride-share services in the city of New York.  This includes private limosine services, van services, and a new category "High Volume For Hire Vehicle" services, those that dispatch 10,000 rides per day or more.  This is a special category defined for Uber and Lyft.  


In [4]:
import os
import dask

By convention, we import the module `dask.dataframe` as `dd`, and call the corresponding `DataFrame` object `ddf`.

**Note**: The term "Dask DataFrame" is slightly overloaded. Depending on the context, it can refer to the module or the DataFrame object. To avoid confusion, throughout this notebook:
- `dask.dataframe` (note the all lowercase) refers to the API, and
- `DataFrame` (note the CamelCase) refers to the object.

The following dataset takes up around 100GB of memory.

In [7]:
import dask.dataframe as dd

ddf = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
)
ddf

Unnamed: 0_level_0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
npartitions=720,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,string,string,string,datetime64[us],datetime64[us],datetime64[us],datetime64[us],int32,int32,float32,int32,float32,float32,float32,float32,float32,float32,float32,float32,category[unknown],category[unknown],category[unknown],category[unknown],category[unknown]
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Dask has not loaded the data yet, it has:
- investigated the input path and found that there are 720 files
- intelligently created a set of jobs for each chunk -- one per original parquet file in this case

Notice that the representation of the `DataFrame` object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

### Lazy Evaluation

Most Dask Collections, including Dask `DataFrame` are evaluated lazily, which means Dask constructs the logic (called task graph) of your computation immediately but "evaluates" them  only when necessary. You can view this task graph using `.visualize()`.

We need to call `.compute()` to trigger actual computations.

Some functions like `len` and `head` also trigger a computation. Specifically, calling `len` will:
- load actual data, (that is, load each file into a pandas DataFrame)
- then apply the corresponding functions to each pandas DataFrame (also known as a partition)
- combine the subtotals to give you the final grand total

In [10]:
# load and count number of rows
len(ddf)

783431901

You can view the start and end of the data as you would in pandas:

In [11]:
ddf.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B02867,B02867,2019-02-01 00:01:26,2019-02-01 00:02:55,2019-02-01 00:05:18,2019-02-01 00:14:57,245,251,2.45,...,0.83,0.0,,0.0,7.48,Y,N,N,N,
1,HV0003,B02879,B02879,2019-02-01 00:26:08,2019-02-01 00:41:29,2019-02-01 00:41:29,2019-02-01 00:49:39,216,197,1.71,...,0.7,0.0,,2.0,7.93,N,N,N,N,
2,HV0005,B02510,,2019-02-01 00:48:58,NaT,2019-02-01 00:51:34,2019-02-01 01:28:29,261,234,5.01,...,3.99,0.0,,0.0,35.970001,N,Y,N,N,
3,HV0005,B02510,,2019-02-01 00:02:15,NaT,2019-02-01 00:03:51,2019-02-01 00:07:16,87,87,0.34,...,0.64,0.0,,3.0,5.39,N,Y,N,N,
4,HV0005,B02510,,2019-02-01 00:06:17,NaT,2019-02-01 00:09:44,2019-02-01 00:39:56,87,198,6.84,...,2.16,0.0,,4.0,17.07,N,Y,N,N,


## Computations with `dask.dataframe`

Let's compute the maximum of the flight delay.

With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums.

```python
import pandas as pd

files = os.listdir(os.path.join('data', 'nycflights'))

maxes = []

for file in files:
    df = pd.read_csv(os.path.join('data', 'nycflights', file))
    maxes.append(df.DepDelay.max())
    
final_max = max(maxes)
```

`dask.dataframe` lets us write pandas-like code, that operates on larger-than-memory datasets in parallel.

In [12]:
%%time
result = ddf.driver_pay.max()
result.compute()

CPU times: user 13.2 ms, sys: 3.78 ms, total: 17 ms
Wall time: 4.11 s


4894.62

This creates the lazy computation for us and then runs it.  

**Note:** Dask will delete intermediate results (like the full pandas DataFrame for each file) as soon as possible. This means you can handle datasets that are larger than memory but, repeated computations will have to load all of the data in each time. (Run the code above again, is it faster or slower than you would expect?)

You can view the underlying task graph using `.visualize()`:

## Sharing Intermediate Results

When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` stores the arguments, allowing duplicate computations to be shared and only computed once.

For example, let's compute the mean and standard deviation for departure delay of all non-canceled flights. Since Dask operations are lazy, those values aren't the final results yet. They're just the steps required to get the result.



### `.persist()`

While using a distributed scheduler, you can keep some _data that you want to use often_ in the _distributed memory_. 

`persist` generates "Futures" and stores them in the same structure as your output. You can use `persist` with any data or computation that fits in memory.

In [18]:
ddf.columns

Index(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
       'request_datetime', 'on_scene_datetime', 'pickup_datetime',
       'dropoff_datetime', 'PULocationID', 'DOLocationID', 'trip_miles',
       'trip_time', 'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
       'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
       'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
       'wav_request_flag', 'wav_match_flag'],
      dtype='object')

## Exercises

In this section you will do a few `dask.dataframe` computations. If you are comfortable with pandas then these should be familiar. You will have to think about when to call `.compute()`.

Hint: User persist to read the data only once

### 1. How many rows are in our dataset?

_Hint_: how would you check how many items are in a list?

In [16]:
ddf = ddf.persist()

In [17]:
len(ddf)

783431901

### 2. In total, how many rides were tipped?

_Hint_: use [boolean indexing](https://pandas.pydata.org/pandas-docs/stable/indexing.html#boolean-indexing).

In [20]:
len(ddf[ddf.tips > 0])

124533228

### 3. In total, how many rides were operated per operator?

*Hint*: use [groupby](https://pandas.pydata.org/pandas-docs/stable/groupby.html). "hvfhs_license_num" is the operator

In [21]:
ddf.groupby("hvfhs_license_num").size().compute()

hvfhs_license_num
HV0002      6388934
HV0003    561586224
HV0004     13884957
HV0005    201571786
dtype: int64

### 4. What was the average driver pay?

In [23]:
ddf.driver_pay.mean().compute()

16.33131671006591

### 5. How big is the payout percentage to the drivers?

In [28]:
(ddf.driver_pay.sum() / ddf.base_passenger_fare.sum()).compute()

0.8088178

### 6. Let's calculate the percentage of tipped rides per operator

In [29]:
ddf["tipped"] = ddf.tips != 0
ddf.groupby("hvfhs_license_num").tipped.mean().compute()

hvfhs_license_num
HV0002    0.084400
HV0003    0.149856
HV0004    0.092949
HV0005    0.191230
Name: tipped, dtype: float64

We can read the data once and store our dataset in memory, this makes our computations very fast

## Custom code with Dask DataFrame

`dask.dataframe` only covers a small but well-used portion of the pandas API.

This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel, e.g, sorting.

Additionally, some important operations like `set_index` work, but are slower than in pandas because they include substantial shuffling of data, and may write out to disk.

**What if you want to use some custom functions that aren't (or can't be) implemented for Dask DataFrame yet?**

You can open an issue on the [Dask issue tracker](https://github.com/dask/dask/issues) to check how feasible the function could be to implement, and you can consider contributing this function to Dask.

In case it's a custom function or tricky to implement, `dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:

- [`map_partitions`](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame
- [`map_overlap`](https://docs.dask.org/en/latest/generated/dask.dataframe.rolling.map_overlap.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame, with some rows shared between neighboring partitions
- [`reduction`](https://docs.dask.org/en/latest/generated/dask.dataframe.Series.reduction.html): for custom row-wise reduction operations.

Let's take a quick look at the `map_partitions()` function:

In [30]:
help(ddf.map_partitions)

Help on method map_partitions in module dask.dataframe.core:

map_partitions(func, *args, **kwargs) method of dask.dataframe.core.DataFrame instance
    Apply Python function on each DataFrame partition.
    
    Note that the index and divisions are assumed to remain unchanged.
    
    Parameters
    ----------
    func : function
        The function applied to each partition. If this function accepts
        the special ``partition_info`` keyword argument, it will receive
        information on the partition's relative location within the
        dataframe.
    args, kwargs :
        Positional and keyword arguments to pass to the function.
        Positional arguments are computed on a per-partition basis, while
        keyword arguments are shared across all partitions. The partition
        itself will be the first positional argument, with all other
        arguments passed *after*. Arguments can be ``Scalar``, ``Delayed``,
        or regular Python objects. DataFrame-like args

The "Distance" column in `ddf` is currently in miles. Let's say we want to convert the units to kilometers and we have a general helper function as shown below. In this case, we can use `map_partitions` to apply this function across each of the internal pandas `DataFrame`s in parallel. 

In [33]:
ddf.driver_pay.sum().compute()

7676685300.0

In [35]:
def my_custom_converter(df, multiplier=1):
    df["driver_pay"] = df["driver_pay"] * multiplier
    return df


ddf = ddf.map_partitions(
    my_custom_converter, multiplier=0.6,
)

In [36]:
ddf.driver_pay.sum().compute()

4606011000.0

### What is `meta`?

Since Dask operates lazily, it doesn't always have enough information to infer the output structure (which includes datatypes) of certain operations.

`meta` is a _suggestion_ to Dask about the output of your computation. Importantly, `meta` _never infers with the output structure_. Dask uses this `meta` until it can determine the actual output structure.

Even though there are many ways to define `meta`, we suggest using a small pandas Series or DataFrame that matches the structure of your final output.

## Close you Dask Cluster

It's good practice to always close any Dask cluster you create:

In [None]:
client.shutdown()