<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_horizontal.svg"
     width="60%"
     alt="Dask logo\" />

# Material from Coiled

* See the [repository of Notebooks](https://github.com/coiled/dask-elearning/)
* See the [webinar on Youtube](https://www.youtube.com/playlist?list=PLeDTMczuyDQ8S73cdc0PrnTO80kfzpgz2)

# Process Tabular Data with Dask DataFrame
In this notebook we will learn about the [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html), a tabular DataFrame interface based on pandas that will automatically build parallel computations.

## When to use Dask DataFrames

Pandas is great for tabular datasets that fit in memory. If your data fits in memory then you should use Pandas. **Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM** where you would normally run into `MemoryError`s.

```python
    MemoryError:  ...
```

This also means:

## Don't use Dask DataFrames if you don't need to!
Distributed computing brings a lot of additional complexity into the mix and will **incur overhead**. If your dataset and computations fit comfortably within your local resources **this overhead will may be larger than the performance gain** you'll get by using Dask. In that case, stick with non-distributed libraries like pandas, numpy and scikit-learn. 

## About this notebook
During this tutorial, we will work with a dataset containg NYC flight data. This dataset is only about 200MB on disk so that you can download it in a reasonable time and exercises finish quickly, but Dask Dataframes will scale to datasets much larger than the memory on your local machine. 

## Getting started with Dask DataFrames

Let's use Dask DataFrame's to explore the NYC flight dataset. Dask's `read_csv` function supports wildcard characters like `"*"` which can be used to load an entire directory of CSV files.

In [None]:
%run prep_data.py -d flights

In [None]:
import os

files = os.path.join('data', 'nycflights', '*.csv')
files

In [None]:
import dask.dataframe as dd

In [None]:
ddf = dd.read_csv(files,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={"TailNum": str,
                        "CRSElapsedTime": float,
                        "Cancelled": bool})
ddf

Notice that the representation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

**Dask is lazy!**

In [None]:
ddf.columns

In [None]:
ddf.dtypes

Dask DataFrames have an `.npartitions` attribute which tells you how many partitions make up a Dask DataFrame.

Dask is able to process larger-than-memory datasets by cutting computations into smaller parts and processing those in parallel.

In [None]:
ddf.npartitions

## The pandas Look & Feel
Dask DataFrames implement a well-used portion of the Pandas API in a way that allows for parallel and out-of-core computation. This means that a lot of Dask DataFrame code will look and feel familiar to pandas users: 

```python
import pandas as pd                   import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv')    df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean()   df.groupby(df.user_id).value.mean().compute()
```

This is because, internally, **a Dask DataFrame is composed of many pandas DataFrames**: 

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" width="50%">

Dask DataFrames are divided into different **partitions** where each partition is a pandas DataFrame. This is why driving the Dask car *can feel* like you're still driving the pandas car: Dask is performing a bunch of regular pandas operations on regular pandas objects under the hood.

But don't forget that you've entered the world of distributed computing now -- which means you've added a lot more complexity to the mix. You now need to consider things like concurrency, state, data duplicates, data loss, etc.

Luckily, with a high-level Collection like DataFrames, Dask handles most of these complicated questions for you. 

## pandas-like Computations

Let's see this in action with a more involved example. Let's compute the largest flight departure delay.

In pandas we could do this by iterating over each file to find the individual maximums and then find the final maximum over the individual maximums.

```python
import pandas as pd

files = os.listdir(os.path.join('data', 'nycflights'))

maxes = []

for file in files:
    df = pd.read_csv(os.path.join('data', 'nycflights', file))
    maxes.append(df.DepDelay.max())

final_max = max(maxes)
```

Thankfully, we can do this with Dask DataFrames using pandas-like code:

In [None]:
max_delay = ddf["DepDelay"].max()

In [None]:
max_delay

The above cell looks exactly like what we would do using pandas...but the result does not! 

Instead of the actual result of the computation, we only get some schematic information. This is because Dask DataFrames are lazily evaluated. This means that **no computation happens unless you explicitly tell Dask to do so** by calling `.compute()`.

Before actually performing a computation, dask first constructs a task graph that it can use to optimize computing the result in parallel. You can think of a task graph as the recipe or routemap that contains all the necessary instructions to arrive at the final result. Once you call `.compute()`, Dask will execute the instructions contained in the task graph and perform computations in parallel.

Let's look at the task graph to get a feel for how Dask's blocked algorithms work:

In [None]:
!pip install graphviz

**NOTICE**: For the tutorial using the `svg` backend because that is the only one available in the `graphviz` statically linked package we installed,
if you have the full `graphviz` package, better use the `png` backend that resizes the image to the width of the cell.

In [None]:
max_delay.visualize(format="svg")

Some things to note:

1.  Up until this point everything is lazy. To evaluate the result for `max_delay`, call its `compute()` method:
2.  Dask will delete intermediate results (like the full pandas DataFrame for each file) as soon as possible.
    -  This lets us handle datasets that are larger than memory
    -  This means that repeated computations will have to load all of the data in each time 

In [None]:
%%time 
max_delay.compute()

## More Dask DataFrame computations

Let's see couple of examples on how the API for Dask DataFrames is the same than Pandas. If you are comfortable with Pandas, the following operations will look very familiar, except we will need to add the `compute()` to get the results wanted.

### Example 1: Total of non-cancelled flights taken

Notice that there is a column in our DataFrame called `"Cancelled"` that is a boolean. 

In [None]:
(~ddf["Cancelled"]).sum().compute()

### Example 2: Total of non-cancelled flights taken by airport

We should select the non-canceled flights, use the operation `groupby` on the `"Origin"` column and finally use `count` to get the detailed per airport.

In [None]:
ddf[~ddf["Cancelled"]].groupby("Origin")["Origin"].count().compute()

### Exercise 1: What is the average departure delay from each airport?

Uncomment and run the cell below to see the solution.

In [None]:
ddf.groupby("Origin")["DepDelay"].mean().compute()

### Exercise 2: What day of the week has the worst average departure delay?
Uncomment and run the cell below to see the solution.

In [None]:
ddf.groupby("DayOfWeek")["DepDelay"].mean().idxmax().compute()

## Working with Partitions
Dask DataFrames implements a large portion of the pandas API by simply performing pandas methods on its partitions (which are pandas objects). 

### Mapping Functions with `map_partitions`
However, sometimes you might want to manipulate your Dask DataFrame with a custom function. You can use the `map_partitions` method for this.

Imagine you find out that there was a 2-minute error in the `DepDelay` column.

Let's create a pandas `apply` function that will subtract 2 from every input:

In [None]:
def subtract_2(df):
    return df.apply(lambda x: x-2)

We can then map this function over all of our partitions:

In [None]:
ddf["Adjusted_DepDelay"] = ddf["DepDelay"].map_partitions(subtract_2)

In [None]:
ddf[["DepDelay", "Adjusted_DepDelay"]].head()

## Performance tip: When to call .compute()

In the examples and exercises above, we sometimes perform the same operation more than once (e.g. `read_csv`). Dask DataFrames hashes the arguments, allowing duplicate computations to be shared, and only computed once. You can use `dask.compute()` to merge task graphs of multiple operations.

For example, let's compute the mean and standard deviation for departure delay of all non-canceled flights. Since Dask operations are lazy, those values aren't the final results until we `compute` them. They're just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.

In [None]:
non_cancelled = ddf[~ddf["Cancelled"]]
mean_delay = non_cancelled["DepDelay"].mean()
std_delay = non_cancelled["DepDelay"].std()

In [None]:
%%time
mean_delay_result = mean_delay.compute()
std_delay_result = std_delay.compute()

Now, let's see how long it takes if we try computing `mean_delay` and `std_delay` with a single `compute()` call.

In [None]:
import dask

In [None]:
%%time
mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations (like `read_csv`) to only be done once instead of twice. In particular, using `dask.compute` only does the following once:

- The calls to `read_csv`
- The filter (`df[~df["Cancelled"]]`)
- The `"DepDelay"` column indexing
- Some of the necessary reductions (`sum`, `count`)

To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function:

In [None]:
dask.visualize(mean_delay, std_delay, format="svg")

# Extra resources

- Explore applying custom code to Dask DataFrames: [Dask Tutorial DataFrames](https://github.com/dask/dask-tutorial/blob/main/04_dataframe.ipynb)
- [Dask DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
- [Dask DataFrame examples](https://examples.dask.org/dataframe.html)