# Data Aggregation with Dask

This notebook contains the data aggregation code to prepare data files for [the dashboard](./dashboard.ipynb). You can run this notebook to see how Dask is used with a Saturn cluster for data processing, but the files generated here will not be used by any of the examples. The dashboard uses pre-aggregated files from Saturn's public S3 bucket.

In [None]:
import os

DATA_PATH = "data"
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

In [None]:
import s3fs
import dask.dataframe as dd
import numpy as np

import hvplot.dask  # noqa
import hvplot.pandas  # noqa

fs = s3fs.S3FileSystem(anon=True)

<hr>

## Initialize A Dask Cluster

This tutorial uses multiple machines to show how to apply more computing resources to machine learning training. This is done with Dask. Saturn Cloud offers managed Dask clusters, which can be provisioned and modified programmatically.

The code below creates a Dask cluster using [`dask-saturn`](https://github.com/saturncloud/dask-saturn), the official Dask client for Saturn Cloud. It creates a cluster with the following specs:

* `n_workers=3` --> 3 machines in the cluster
* `scheduler_size='medium'` --> the Dask scheduler will have 4GB of RAM and 2 CPU cores
* `worker_size='large'` --> each worker machine will have 2 CPU cores and 16GB of RAM

To see a list of possible sizes, run the code below.

In [None]:
import dask_saturn

dask_saturn.describe_sizes()

The `dask-saturn` code below creates two important objects: a cluster and a client.

* `cluster`: knows about and manages the scheduler and workers
    - can be used to create, resize, reconfigure, or destroy those resources
    - knows how to communicate with the scheduler, and where to find logs and diagnostic dashboards
* `client`: tells the cluster to do things
    - can send work to the cluster
    - can restart all the worker processes
    - can send data to the cluster or pull data back from the cluster

In [None]:
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster

n_workers = 3
cluster = SaturnCluster(n_workers=n_workers, scheduler_size="medium", worker_size="large")
client = Client(cluster)
cluster

If you created your cluster here in this notebook, it might take a few minutes for all your nodes to become available. You can run the chunk below to block until all nodes are ready.

>**Pro tip**: Create and/or start your cluster in the Saturn UI if you want to get a head start!

In [None]:
client.wait_for_workers(n_workers=n_workers)

# Load data

Setup a function to load files with Dask. Cleanup some column names and parse data types correctly.

In [None]:
usecols = [
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "store_and_fwd_flag",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
]


def read_taxi_csv(files):
    ddf = dd.read_csv(
        files,
        assume_missing=True,
        parse_dates=[1, 2],
        usecols=usecols,
        storage_options={"anon": True},
    )
    # grab the columns we need and rename
    ddf = ddf[
        [
            "tpep_pickup_datetime",
            "tpep_dropoff_datetime",
            "PULocationID",
            "DOLocationID",
            "passenger_count",
            "trip_distance",
            "payment_type",
            "tip_amount",
            "fare_amount",
        ]
    ]
    ddf.columns = [
        "pickup_datetime",
        "dropoff_datetime",
        "pickup_taxizone_id",
        "dropoff_taxizone_id",
        "passenger_count",
        "trip_distance",
        "payment_type",
        "tip_amount",
        "fare_amount",
    ]
    return ddf

Get a listing of files from the public S3 bucket

In [None]:
files = [
    f"s3://{x}"
    for x in fs.glob("s3://nyc-tlc/trip data/yellow_tripdata_201*.csv")
    if "2017" in x or "2018" in x or "2019" in x
]
len(files), files[:2]

In [None]:
ddf = read_taxi_csv(files[:5])  # only load first 5 months of data

<br>

We are loading a small sample for this exercise, but if you want to use the full data and replicate the aggregated data hosted on Saturn's bucket, you will need to use a larger cluster. Here is a sample cluster configuration you can use, but you can play around with sizes and see how performance changes!

```python
cluster = SaturnCluster(
    n_workers=10, 
    scheduler_size='xlarge',
    worker_size='8xlarge', 
    nthreads=32,
)
```

You will have to run `cluster.reset(...)` if the cluster has already been configured. Run the following to see what sizes are available:

```python
from dask_saturn.core import describe_sizes
describe_sizes()
```

In [None]:
# load all 3 years of data
# ddf = read_taxi_csv(files)

# Aggregated files for Dashboard

Create several CSV file to use for visualization in the dashboard. Note that each of these perform some Dask dataframe operations, then call `compute()` to pull down a pandas dataframe, and then write that dataframe to a CSV file.

## Augment data

We'll distill some features out of the datetime component of the data. This is similar to the feature engineering that is done in other places in this demo, but we'll only create the features that'll be most useful in the visuals. 

In [None]:
ddf["pickup_hour"] = ddf.pickup_datetime.dt.hour
ddf["dropoff_hour"] = ddf.dropoff_datetime.dt.hour
ddf["pickup_weekday"] = ddf.pickup_datetime.dt.weekday
ddf["dropoff_weekday"] = ddf.dropoff_datetime.dt.weekday
ddf["percent_tip"] = (ddf["tip_amount"] / ddf["fare_amount"]).replace(
    [np.inf, -np.inf], np.nan
) * 100

We'll take out the extreme high values since they disrupt the mean

In [None]:
ddf["percent_tip"] = ddf["percent_tip"].apply(lambda x: np.nan if x > 1000 else x)

Notice that all of the above cells execute pretty much instantly. This is because of Dask's [lazy evaluation](https://tutorial.dask.org/01x_lazy.html). Calling `persist()` below tells Dask to run all the operations and keep the results in memory for faster computation. This cell takes some time to run because Dask needs to first parse all the CSV files.

In [None]:
%%time
ddf = ddf.persist()
_ = wait(ddf)

## Timeseries datasets

We'll resample to an hourly timestep so that we don't have to pass around so much data later on.

In [None]:
tip_ddf = ddf[["pickup_datetime", "percent_tip"]].set_index("pickup_datetime").dropna()
tips = tip_ddf.resample("1H").mean().compute()

tips.to_csv(f"{DATA_PATH}/pickup_average_percent_tip_timeseries.csv")

In [None]:
fare_ddf = ddf[["pickup_datetime", "fare_amount"]].set_index("pickup_datetime").dropna()
fare = fare_ddf.resample("1H").mean().compute()

fare.to_csv(f"{DATA_PATH}/pickup_average_fare_timeseries.csv")

## Aggregate datasets

Since our data is rather large and will mostly be viewed in grouped aggregates, we can do some aggregation now and save it off for use in plots later. 

In [None]:
for value in ["pickup", "dropoff"]:
    data = (
        ddf.groupby(
            [
                f"{value}_taxizone_id",
                f"{value}_hour",
                f"{value}_weekday",
            ]
        )
        .agg(
            {
                "fare_amount": ["mean", "count", "sum"],
                "trip_distance": ["mean"],
                "percent_tip": ["mean"],
            }
        )
        .compute()
    )
    data.columns = data.columns.to_flat_index()
    data = data.rename(
        {
            ("fare_amount", "mean"): "average_fare",
            ("fare_amount", "count"): "total_rides",
            ("fare_amount", "sum"): "total_fare",
            ("trip_distance", "mean"): "average_trip_distance",
            ("percent_tip", "mean"): "average_percent_tip",
        },
        axis=1,
    ).reset_index(level=[1, 2])
    data.to_csv(f"{DATA_PATH}/{value}_grouped_by_zone_and_time.csv")

grouped_zone_and_time = data

In [None]:
for value in ["pickup", "dropoff"]:
    data = (
        ddf.groupby(
            [
                f"{value}_taxizone_id",
            ]
        )
        .agg(
            {
                "fare_amount": ["mean", "count", "sum"],
                "trip_distance": ["mean"],
                "percent_tip": ["mean"],
            }
        )
        .compute()
    )
    data.columns = data.columns.to_flat_index()
    data = data.rename(
        {
            ("fare_amount", "mean"): "average_fare",
            ("fare_amount", "count"): "total_rides",
            ("fare_amount", "sum"): "total_fare",
            ("trip_distance", "mean"): "average_trip_distance",
            ("percent_tip", "mean"): "average_percent_tip",
        },
        axis=1,
    )
    data.to_csv(f"{DATA_PATH}/{value}_grouped_by_zone.csv")

grouped_zone = data

In [None]:
value = "pickup"
data = (
    ddf.groupby([f"{value}_hour", f"{value}_weekday"])
    .agg(
        {
            "fare_amount": ["mean", "count", "sum"],
            "trip_distance": ["mean"],
            "percent_tip": ["mean"],
        }
    )
    .compute()
)
data.columns = data.columns.to_flat_index()
data = data.rename(
    {
        ("fare_amount", "mean"): "average_fare",
        ("fare_amount", "count"): "total_rides",
        ("fare_amount", "sum"): "total_fare",
        ("trip_distance", "mean"): "average_trip_distance",
        ("percent_tip", "mean"): "average_percent_tip",
    },
    axis=1,
)

data.to_csv(f"{DATA_PATH}/{value}_grouped_by_time.csv")
grouped_time = data

## Get shape files for dashboard

The shape files are stored in a zip on the public S3. Here we pull it down, unzip it, then place the files on our S3.

In [None]:
import zipfile

with fs.open("s3://nyc-tlc/misc/taxi_zones.zip") as f:
    with zipfile.ZipFile(f) as zip_ref:
        zip_ref.extractall(f"{DATA_PATH}/taxi_zones")

## Examples

To make use of the new datasets we can visualize all the data at once using a grouped heatmap

In [None]:
grouped_zone_and_time.hvplot.heatmap(
    x="dropoff_weekday",
    y="dropoff_hour",
    C="average_percent_tip",
    groupby="dropoff_taxizone_id",
    responsive=True,
    min_height=600,
    cmap="viridis",
    clim=(0, 20),
    colorbar=False,
)

This dataset that is only grouped by zone can be paired with other information such as geography.

In [None]:
import geopandas as gpd

zones = gpd.read_file(f"{DATA_PATH}/taxi_zones/taxi_zones.shp").to_crs("epsg:4326")
joined = zones.join(grouped_zone, on="LocationID")

joined.hvplot(
    x="longitude",
    y="latitude",
    c="average_fare",
    geo=True,
    tiles="CartoLight",
    cmap="fire",
    alpha=0.5,
    hover_cols=["zone", "borough"],
    title="Average fare by dropoff location",
    height=600,
    width=800,
    clim=(0, 100),
)

<hr>

## Next Steps

In this tutorial, you learned how to use Dask to extract, transform, and load data. You also learned how to use [`hvplot`](https://hvplot.holoviz.org/index.html) to create data visualizations on top of that data.

Next, try [this dask-ml notebook](./hyperparameter-dask.ipynb) to see how to use this Dask DataFrame to accelerate common machine learning tasks like feature engineering and hyperparameter tuning.

<hr>