<img style="float: right" src="img/saturn.png" width="300" />

# Machine Learning on Big Data with Dask

## Processing Large datasets

This notebook shows how to process large datasets with Dask in preparation for machine learning.

## Initialize Dask cluster

### Exercise

Create a `cluster` and `client` object to connect to our Dask cluster. Check out [03-dask-basics.ipynb](03-dask-basics.ipynb) if you need some help!

In [None]:
from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    <<< FILL IN >>>
)
client = <<< FILL IN >>>

In [None]:
from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=5,
    nthreads=4,
)
client = Client(cluster)

In [None]:
client.wait_for_workers(5)
client

# Load and process large dataset

## Load data

Our large dataset for this notebook will be NYC taxi data from all of 2019 (in [02-single-node.ipynb](02-single-node.ipynb) we just used one month of 2019). Rather than load the data with pandas' `pd.read_csv`, we will use Dask's [`dd.read_csv` method](https://docs.dask.org/en/latest/dataframe-create.html).

In [None]:
import s3fs
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd
from dask.distributed import wait

s3 = s3fs.S3FileSystem(anon=True)

`dd.read_csv` accepts glob syntax for loading in multiple files. This way, we don't have to write a for loop and concatenate DataFrames like we tried with pandas. 

In [None]:
files_2019 = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'
s3.glob(files_2019)

<br>

We need to pass a couple of extra arguments to `dd.read_csv`:
- `storage_options=...`: this tells Dask to use anonymous S3 access (we did this with `s3.open` for pandas)
- `assuming_missing=True`: this tells Dask to read all numeric columns as floats. Dask sometimes needs type information up-front to be able to parallelize tasks effectively.

In [None]:
%%time

taxi = dd.read_csv(
    files_2019,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)

In [None]:
taxi

Notice that previewing the `taxi` object doesn't print out the contents of the DataFrame, like with pandas. This is because Dask has not yet loaded any data. It does tell us the number of partitions (i.e. little pandas DataFrames) the big Dask DataFrame has. 

### Exercise

Compute the number of rows in the `taxi` DataFrame (hint: think pandas API!).

In [None]:
<<< FILL IN >>>

In [None]:
len(taxi)

--- 

You may have tried `taxi.shape` and gotten some unfamiliar output. This because of Dask's lazy evaluation - Dask doesn't perform any operations until asked to. `len` is a special case that triggers computation. If we want to get the row count of out `taxi.shape`, we need to run `.compute()` on the delayed object.

In [None]:
taxi.shape[0].compute()

In [None]:
taxi_bytes = taxi.memory_usage(deep=True).sum()
taxi_bytes

Notice the above cell completed immediately (but we don't have the result). A comparable pandas call would take a few seconds to compute the memory usage. You guessed it - lazy evaluation! 

### Exercise

Trigger computation on `taxi_bytes` to get the actual size of the Dask DataFrame.

In [None]:
%%time
print(f"Size (MB): {<<< FILL IN >>>}")

In [None]:
%%time
print(f"Size (MB): {taxi_bytes.compute() / 1e6}")

--- 

`.compute()` returns results immediately. Be careful though, because if you run `taxi.compute()` Dask will give you the entire big DataFrame as a pandas object (this will certainly blow up the kernel!).

It it useful in many cases to trigger computation on objects even if you don't want to pull them down to the Jupyter Server. In this case we use `.persist()`, which triggers all computations performed on the DataFrame and holds the results in memory across the _cluster_. More discussion about `.compute()` vs. `.persist()` is in [03-dask-basics.ipynb](03-dask-basics.ipynb). Persisting becomes useful when we perform later machine learning operations, as we don't want Dask to be re-parsing CSV files in each iteration of model training.

In [None]:
taxi = taxi.persist()

Notice that the above cell completed immediately but the Dask Dashboard is still doing work. We can use the `wait()` function to block our notebook until the `taxi` DataFrame is fully done persisting.

In [None]:
%%time
_ = wait(taxi)

The cell below will run much faster than before! This is because the DataFrame is loaded up into memory across the cluster, and Dask does not need to download and parse the CSV files again.

In [None]:
%%time
taxi_bytes = taxi.memory_usage(deep=True).sum()
print(f"Size (MB): {taxi_bytes.compute() / 1e6}")

### Messy data - `dask.delayed`

> Advanced topic: This section is optional. You may continue to the **Exploratory Analysis** section if you want to skip this.

Data files aren't always provided in a clean tabular form thats readable with a `read_*` method from pandas or Dask. With [`dask.delayed` functions](https://docs.dask.org/en/latest/delayed.html), we can write a function that processes a single chunk of raw data and then tell Dask to collect these into a Dask DataFrame. We'll illustrate that now with the CSV files, but its always better to use a `dd.read_*` method if your data supports it. `dask.delayed` is very flexible and powerful - chances are you will use it for some of your workloads. 

In [None]:
@dask.delayed
def make_data(i):
    return pd.DataFrame([(i,), ], columns=['foo'])

dfs = []
for i in range(10):
    df = make_data(i)
    dfs.append(df)

In [None]:
dask_dataframe = dd.from_delayed(dfs)
dask_dataframe

Remember, we can only call `.compute()` here because we know the dataframe is small!

In [None]:
dask_dataframe.compute()

### Exercise

Define a delayed function, `load_csv` that will return a pandas DataFrame for a given NYC taxi file path. (Hint: a similar function was created in [02-single-node.ipynb](02-single-node.ipynb)). Test your function with this path:

```https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv```

In [None]:
@dask.delayed
def load_csv(path):
    <<< FILL IN >>>
    
df = <<< FILL IN >>>

In [None]:
@dask.delayed
def load_csv(path):
    df = pd.read_csv(
        path,
        parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
    )
    return df
    
df = load_csv('https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv')

--- 

Now we can call this for the 2019 files and create a Dask DataFrame with `dd.from_delayed()`. `from_delayed()` expects a list of delayed objects that return a pandas dataframe. It will then trigger the loading across the cluster and create a distributed dataframe object that wraps the pandas dataframes.

Notice that this for loop looks very similar to what blew up our kernel in [02-single-node.ipynb](02-single-node.ipynb). Because of Dask's lazy evaluation none of these functions actually pull data until we perform operations with it.

In [None]:
dfs = []
for f in s3.glob(files_2019):
    df = load_csv(f's3://{f}')
    dfs.append(df)
taxi_delayed = dd.from_delayed(dfs)

In [None]:
taxi_delayed

In [None]:
taxi_delayed.head()

This is just an example of how you would use `dask.delayed` to pull data from anywhere into a distributed dataframe. Because our NYC taxi data is already in CSV format, we can use the native `dask.dataframe.read_csv` method, which is also more efficient than a custom delayed workflow.

# Exploratory analysis

We'll go back to using the `taxi` Dask DataFrame we loaded with `dd.read_csv`.

In [None]:
%%time
taxi_describe = taxi.describe().compute().T
np.round(taxi_describe, 3)

### Exercise

Do some of your own exploratory analysis with the `taxi` dataframe. Try things that you would normally do with pandas and see how the Dask dataframe reacts!

In [None]:
<<< FILL IN >>>

In [None]:
# There's no right answer here! Hopefully you learned something about Dask :) 

## Feature engineering

In [None]:
# specify feature and label column names
raw_features = [
    'tpep_pickup_datetime', 
    'passenger_count', 
    'tip_amount', 
    'fare_amount',
]
features = [
    'pickup_weekday', 
    'pickup_weekofyear', 
    'pickup_hour', 
    'pickup_week_hour', 
    'pickup_minute', 
    'passenger_count',
]
label = 'tip_fraction'

### Exercise

Copy over the `prep_df()` function from [02-single-node.ipynb](02-single-node.ipynb). Then:
- Apply the function to `taxi` and save that in a new dataframe called `taxi_feat`. 
- Preview the results

Are any changes needed to `prep_df()`?

In [None]:
def prep_df(taxi_df):
    <<< FILL IN >>>

In [None]:
taxi_feat = <<< FILL IN >>>

In [None]:
def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount
     
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)
    
    return df

In [None]:
taxi_feat = prep_df(taxi)
taxi_feat.head()

Notice that this feature engineering code is _exactly_ the same as what we did in [02-single-node.ipynb](02-single-node.ipynb). Dask' DataFrame API matches pandas' API in many places. Check out the [Dask DataFrame docs](https://docs.dask.org/en/latest/dataframe.html#dask-dataframe-copies-the-pandas-api) for more information on what is and is not supported from the pandas API.