<img src="https://docs.dask.org/en/stable/_images/dask_horizontal.svg" align="right" width="30%">

Dask for Parallel Python
========================

Dask has many APIs, some are low level, and some are high level:

1.  [Low level](https://docs.dask.org/en/stable/futures.html): let you parallelize mostly any Python code

    General purpose, you're in control
    
2.  [High level](https://docs.dask.org/en/stable/dataframe.html): mimic common PyData libraries like Numpy/Pandas/Xarray/XGBoost/...

    Special purpose, lots of automation
    
In this notebook we're going to use *both* to work through a tabular data problem.  In particular we'll use:

1.  Dask Futures, a low-level API that can do mostly anything
2.  Dask Dataframe, a high level API that makes Pandas-at-scale workflows easy

Data and Problem
----------------

We're going to play with the NYC Flights data showing flights in to and out of the NYC area.  This data is stored as a directory of CSV files.

In [None]:
from prep_data import flights
flights()

import os
os.listdir("nycflights")

Let's work together to better understand the performance of the airports in the NYC area (EWR, JFK, LGA).

We'll do this by asking increasingly complicated questions like the following:

1.  How many flights arrived or took off per year?
2.  What was the latest that a flight has departed?
3.  Which airport has the best record for on-time departures?

We'll do this first sequentially, and then in parallel using the low-level Dask Futures API, and then in parallel using the high level Dask Dataframe API.

How many flights took off per year?
-----------------------------------

### Sequential code

In [None]:
import os, glob

filenames = glob.glob(
    os.path.join('nycflights', "*.csv")
)

filenames

In [None]:
import pandas as pd

In [None]:
%%time

years = []
lengths = []
for filename in filenames:
    year = int(os.path.split(filename)[-1].split(".")[0])
    df = pd.read_csv(filename)
    length = len(df)
    
    years.append(year)
    lengths.append(length)

In [None]:
for year, length in zip(years, lengths):
    print("Year", year, "had",  length, "flights")

## First, learn about Dask Futures

### Parallel Code with low-level Futures

This is an example of an embarrassingly parallel computation.  We want to run the same Python code on many pieces of data.  This is a very simple and also very common case that comes up all the time.

Let's learn how to do this with [Dask futures](https://docs.dask.org/en/stable/futures.html)

First, we're going to see a very simple example, then we'll try to parallelize the code above.


### Set up a Dask cluster locally

In [None]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1)
client

### Dask Futures introduction

In [None]:
import time

def slowinc(x, delay=1):
    time.sleep(delay)
    return x + 1

def slowdouble(x, delay=1):
    time.sleep(delay)
    return 2 * x

Dask futures lets us run Python functions remotely on parallel hardware.  Rather than calling the function directly:

In [None]:
%%time

y = slowinc(10)
z = slowdouble(y)
z

We can ask Dask to run that function, `slowinc` on the data `10` by passing each as arguments into the `client.submit` method.  The first argument is the function to call and the rest of the arguments are arguments to that function.

In [None]:
%%time

y_future = client.submit(slowinc, 10)
z_future = client.submit(slowdouble, y_future)
z_future

You'll notice that that happened immediately.  That's because all we did was submit the `slowinc` function to run on Dask, and then return a `Future`, or a pointer to where the data will eventually be.

We can gather the future by calling `future.result()`

In [None]:
z_future

In [None]:
z = z_future.result()
z

## Submit many tasks in a loop

We can submit lots of functions to run at once, and then gather them when we're done.  This allows us to easily parallelize simple for loops.

*This section uses the following API*:

-  [Client.submit and Future.result](https://docs.dask.org/en/stable/futures.html#submit-tasks)

### Sequential code

In [None]:
%%time 

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = []

for x in data:
    y = slowinc(x)
    z = slowdouble(y)
    results.append(z)
    
results

In [None]:
%%time

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
futures = []

for x in data:
    y_future = client.submit(slowinc, x)
    z_future = client.submit(slowdouble, y_future)
    futures.append(z_future)
    
results = [future.result() for future in futures]
results

Back to flights
---------------

Given the pattern above, can you parallelize the sequential work below?

Some things to think about:

1.  Which of the Python calls do you want to offload to the Dask cluster?

    (there are many right answers here)
    
2.  How much more quickly do you think it will run?

    (only if you like thinking about performance)

### Sequential code

In [None]:
%%time

years = []
lengths = []
for filename in filenames:
    year = int(os.path.split(filename)[-1].split(".")[0])
    df = pd.read_csv(filename)
    length = len(df)
    
    years.append(year)
    lengths.append(length)

### Exercise: Parallel code

For this section you will need the [Client.submit and Future.result](https://docs.dask.org/en/stable/futures.html#submit-tasks) API described above.

-  client.submit
-  Future.result

In [None]:
%%time

...

for filename in filenames:
    ...

## What is the longest delay?

### Sequential code

In [None]:
%%time

delays = []
for filename in filenames:
    df = pd.read_csv(filename)
    delay = df.ArrDelay.max()
    delays.append(delay)
    
max(delays)

### Exercise: Parallel code

This should be similar and maybe a bit simpler than the exercise above.  

You should think about two things:

-  How do you send a method call, like `df.ArrDelay.max()` to `client.submit`?
-  How should you handle `max`?  (there are a couple good answers)


In [None]:
%%time

...

for filename in filenames:
    ...

## How many flights total?

How many flights were there in the entire dataset?

We're going to ask you to write both the sequential and parallel codes this time.

### Sequential code

In [None]:
%%time

...

for filename in filenames:
    ...

### Parallel code


In [None]:
%%time

...

for filename in filenames:
    ...

## Dask DataFrame

This is great.  We could ask increasingly complex questions and you could write down increasingly complex parallel algorithms like this.  

Fortunately, someone has already done this work for Pandas and put all of these algorithms into the [dask.dataframe library](https://docs.dask.org/en/stable/dataframe.html).

In [None]:
import dask.dataframe as dd

df = dd.read_csv(
    os.path.join("nycflights", "*.csv"),
    parse_dates={'Date': [0, 1, 2]},
    usecols=["ArrTime", "UniqueCarrier", "ActualElapsedTime", "ArrTime", "ArrDelay", "DepDelay", "Origin", "Dest", "Distance", "Cancelled"],
)
df.head()

In [None]:
%%time

df.ArrDelay.max().compute()

Dask Dataframe looks a lot like Pandas.  The biggest differences are that ...

1.  It runs in parallel on top of Dask
2.  You have to call `.compute()` when you want a parallel result delivered to your computer as a normal result.

## Exercise: Average arrival delay by airport

What is the average arrival delay for flights departing from the three major airports, `EWR` (Newark), `JFK`, and `LGA` (LaGuardia)

You'll want to look at the `Origin` and `ArrDelay` columns.  

In this exercise you will not use either Client.submit or Future.result (those are only for the low level API, never for Dask dataframe).  Instead you will use normal Pandas syntax, and the `.compute()` method when you are ready for a final result.

In [None]:
df.head()

## Challenge Exercise 1: Compute Quantiles

Rather than just the average, see what 10%, 50%, and 90% quantiles are like for each airport.

You may want to refer to the [Dask DataFrame API](https://docs.dask.org/en/stable/dataframe-api.html) to find useful methods.

## Challenge Exercise 2: Compute Average with Dask Futures

Do the same exercise as the normal (non-challenge) exercise above but manually with the low-level Dask futures API.

## Managing Memory

When we run operations like the following many times we're being inefficient:

In [None]:
%%time

print(df[df.Origin == "EWR"].ArrDelay.mean().compute())

print(df[df.Origin == "LGA"].ArrDelay.mean().compute())

print(df[df.Origin == "JFK"].ArrDelay.mean().compute())

We spend most of our time reading the CSV files each time.  

There are two ways to address this.

### 1. Ask for everything at once

In [None]:
%%time

ewr = df[df.Origin == "EWR"].ArrDelay.mean()
lga = df[df.Origin == "LGA"].ArrDelay.mean()
jfk = df[df.Origin == "JFK"].ArrDelay.mean()

import dask

dask.compute(ewr, lga, jfk)

## 2.  Persist data in memory

See API reference for the [`persist` method](https://docs.dask.org/en/stable/api.html?highlight=persist#dask.persist)

In [None]:
df = df.persist()

In [None]:
%%time

print(df[df.Origin == "EWR"].ArrDelay.mean().compute())
print(df[df.Origin == "LGA"].ArrDelay.mean().compute())
print(df[df.Origin == "JFK"].ArrDelay.mean().compute())

# Next Steps

In the next notebooks we'll expand on the lessons learned here in two ways:

1.  Use Dask Futures for more advanced applications beyond dataframes
2.  Scale up to distributed clusters