# LightGBM + Dask

<table>
    <tr>
        <td>
            <img src="./img/lightgbm.svg" width="300">
        </td>
        <td>
            <img src="./img/dask-horizontal.svg" width="300">
        </td>
    </tr>
</table>

This notebook describes a machine learning training workflow using the famous [NYC Taxi Dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). That dataset contains information on taxi trips in New York City.

In this exercise, you'll load data into a [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html) and use [LightGBM](https://lightgbm.readthedocs.io/en/latest/) to answer this question

> based on characteristics that can be known at the beginning of a trip, what tip will this trip earn (as a % of the total fare)?

This notebook gives an introductory tutorial on how to use Dask to scale training of LightGBM models. For more detailed information, see ["LightGBM Training with Dask"](https://www.saturncloud.io/docs/tutorials/lightgbm/) in Saturn Cloud's documentation.

<hr>

## Initialize A Dask Cluster

This tutorial uses multiple machines to show how to apply more computing resources to machine learning training. This is done with Dask. Saturn Cloud offers managed Dask clusters, which can be provisioned and modified programmatically.

The code below creates a Dask cluster using [`dask-saturn`](https://github.com/saturncloud/dask-saturn), the official Dask client for Saturn Cloud. It creates a cluster with the following specs:

* `n_workers=3` --> 3 machines in the cluster
* `scheduler_size='medium'` --> the Dask scheduler will have 4GB of RAM and 2 CPU cores
* `worker_size='large'` --> each worker machine will have 2 CPU cores and 16GB of RAM

To see a list of possible sizes, run the code below.

In [2]:
import dask_saturn

dask_saturn.describe_sizes()

{'medium': 'Medium - 2 cores - 4 GB RAM',
 'large': 'Large - 2 cores - 16 GB RAM',
 'xlarge': 'XLarge - 4 cores - 32 GB RAM',
 '2xlarge': '2XLarge - 8 cores - 64 GB RAM',
 '4xlarge': '4XLarge - 16 cores - 128 GB RAM',
 '8xlarge': '8XLarge - 32 cores - 256 GB RAM',
 '12xlarge': '12XLarge - 48 cores - 384 GB RAM',
 '16xlarge': '16XLarge - 64 cores - 512 GB RAM',
 'g4dnxlarge': 'T4-XLarge - 4 cores - 16 GB RAM - 1 GPU',
 'g4dn4xlarge': 'T4-4XLarge - 16 cores - 64 GB RAM - 1 GPU',
 'g4dn8xlarge': 'T4-8XLarge - 32 cores - 128 GB RAM - 1 GPU',
 'p32xlarge': 'V100-2XLarge - 8 cores - 61 GB RAM - 1 GPU',
 'p38xlarge': 'V100-8XLarge - 32 cores - 244 GB RAM - 4 GPU',
 'p316xlarge': 'V100-16XLarge - 64 cores - 488 GB RAM - 8 GPU'}

The `dask-saturn` code below creates two important objects: a cluster and a client.

* `cluster`: knows about and manages the scheduler and workers
    - can be used to create, resize, reconfigure, or destroy those resources
    - knows how to communicate with the scheduler, and where to find logs and diagnostic dashboards
* `client`: tells the cluster to do things
    - can send work to the cluster
    - can restart all the worker processes
    - can send data to the cluster or pull data back from the cluster

In [3]:
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster

n_workers = 3
cluster = SaturnCluster(n_workers=n_workers, scheduler_size="medium", worker_size="large")
client = Client(cluster)

INFO:dask-saturn:Cluster is ready
INFO:dask-saturn:Registering default plugins

+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask        | 2021.01.1 | 2.30.0    | 2.30.0  |
| distributed | 2021.01.1 | 2.30.0    | 2.30.0  |
+-------------+-----------+-----------+---------+
INFO:dask-saturn:{'tcp://10.0.16.110:33651': {'status': 'repeat'}, 'tcp://10.0.2.105:43439': {'status': 'repeat'}, 'tcp://10.0.3.85:41047': {'status': 'repeat'}}


If you created your cluster here in this notebook, it might take a few minutes for all your nodes to become available. You can run the chunk below to block until all nodes are ready.

>**Pro tip**: Create and/or start your cluster in the Saturn UI if you want to get a head start!

In [4]:
client.wait_for_workers(n_workers=n_workers)

### Monitor Resource Usage

This tutorial aims to teach you how to take advantage of multiple GPUs for data science workflows. To prove to yourself that Dask is taking advantage of the resources in the cluster, it's important to understand how to monitor that utilization while your code is running.

Print the `cluster` object in a notebook renders a widget that shows the number of workers, available CPU and memory, and a dashboard link.

In [5]:
cluster

VBox(children=(HTML(value='<h2>SaturnCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n   …

<hr>

## Load data

This example is designed to run quickly with small, relatively inexpensive resources. So let's just load a single month of taxi data for training.

In [6]:
import dask.dataframe as dd

taxi = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
).sample(frac=0.3, replace=False)

The code below computes the size of this dataset in memory.

In [7]:
print(f"Num rows: {len(taxi)}, Size: {taxi.memory_usage(deep=True).sum().compute() / 1e6} MB")


+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask        | 2021.01.1 | 2.30.0    | 2.30.0  |
| distributed | 2021.01.1 | 2.30.0    | 2.30.0  |
+-------------+-----------+-----------+---------+


CancelledError: 

You can examine the structure of the data with Dask DataFrame commands:

`.head()` = view the first few rows

In [8]:
taxi.head()

CancelledError: 


+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask        | 2021.01.1 | 2.30.0    | 2.30.0  |
| distributed | 2021.01.1 | 2.30.0    | 2.30.0  |
+-------------+-----------+-----------+---------+


`.dtypes` = list all the columns and the type of data in them

In [9]:
taxi.dtypes

VendorID                        float64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                    float64
DOLocationID                    float64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

<hr>

## Prep for Training

Before training a model, we need to transform this dataset into a format that's better-suited to the research question. The function below does that with Dask DataFrame operations.

<details><summary>(click here to learn why data scientists do this)</summary>

**Compute the Target**

The raw data don't contain a column that cleanly describes the tip as a percentage of the total fare. So we need to add one!

**Add Features**

Giving a machine learning model a richer description of each training observation improves its ability to describe the relationship between those observations' characteristics and the target. These characteristics are called "features".

For example, instead of giving a model a raw timestamp, it can be valuable to provide multiple derived characteristics like hour of the day and day of the week. It's plausible, for example, that weekend rides might have a different distribution of tips because they tend to be for leisure, where weekday rides might be mostly people travelling for work.

**Remove Unused Features**

If the goal is to produce a model that could predict the tip for a ride, then characteristics that can only be known AFTER the tip have to be excluded. For example, you can't know the dropoff time or the type of payment until a ride has concluded.

Such features should be dropped before training.
    
</details>

In [None]:
def prep_df(df: dd.DataFrame, target_col: str) -> dd.DataFrame:
    """
    Prepare a raw taxi dataframe for training.
        * computes the target ('tip_fraction')
        * adds features
        * removes unused features
    """
    numeric_feat = [
        "pickup_weekday",
        "pickup_weekofyear",
        "pickup_hour",
        "pickup_week_hour",
        "pickup_minute",
        "passenger_count",
    ]
    categorical_feat = [
        "PULocationID",
        "DOLocationID",
    ]
    features = numeric_feat + categorical_feat
    df = df[df.fare_amount > 0]  # avoid divide-by-zero
    df[target_col] = df.tip_amount / df.fare_amount

    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.weekday
    df["pickup_weekofyear"] = df.tpep_pickup_datetime.dt.isocalendar().week
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_week_hour"] = (df.pickup_weekday * 24) + df.pickup_hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [target_col]].astype(float).fillna(-1)

    return df

Run the code below to get a new data frame, `taxi_train`, that can be used directly for model training.

In [None]:
target_col = "tip_fraction"
taxi_train = prep_df(taxi, target_col)

Dask performs computations in a [lazy manner](https://tutorial.dask.org/01x_lazy.html), so we persist the dataframe to perform data loading and feature processing.

In [None]:
%%time
taxi_train = taxi_train.persist()
_ = wait(taxi_train)

taxi_train.head()

Before going further, check the first few rows of the dataset to make sure that the features look reasonable.

Now that the dataframe has been processed, check its size in memory again.

In [None]:
print(
    f"Num rows: {len(taxi_train)}, Size: {taxi_train.memory_usage(deep=True).sum().compute() / 1e9} GB"
)

<hr>

## Train a model

This example uses the native Dask integration built into LightGBM. That integration will be released in `lightgbm` 3.2.0. You can build `lightgbm` from source by adding code like this in a Saturn custom image build's `postBuild`:

```shell
sudo apt update
sudo apt install build-essential cmake --yes

git clone \
    --recursive https://github.com/microsoft/LightGBM \
    /tmp/LightGBM

cd /tmp/LightGBM/python-package

python setup.py install
```

In [None]:
import lightgbm as lgb

LightGBM's Dask interface accepts a Dask DataFrame, Dask Series, or Dask Array as input.

In [None]:
features = [c for c in taxi_train.columns if c != target_col]

data = taxi_train[features]
label = taxi_train[target_col]

The model objects from `lightgbm.dask` have the same API as their scikit-learn equivalents. So, for example, `lightgbm.dask.DaskLGBMRegressor` can be used the same way that `lightgbm.sklearn.LGBMRegressor` can.

In [None]:
%%time

dask_reg = lgb.dask.DaskLGBMRegressor(
    silent=False,
    max_depth=5,
    random_state=708,
    objective="regression_l2",
    learning_rate=0.1,
    tree_learner="data",
    n_estimators=100,
    min_child_samples=1,
    n_jobs=-1,
)

dask_reg.fit(
    client=client,
    X=data,
    y=label,
)

<hr>

## Save model

The model object created above is an instance of `DaskLGBMRegressor`. If you don't want to have Dask as a dependency when you deploy this model, you can get a regular `lightgbm.sklearn.LGBMRegressor` from it with `.to_local()`.

In [None]:
local_reg = dask_reg.to_local()
print(type(local_reg))

If you'd prefer an even lower-level model object, you can also extract a LightGBM Booster from the fitted model object.

In [None]:
booster = dask_reg.booster_
print(type(booster))

To save this model, you have a few options:
    
* store the `DaskLGBMRegressor` with `cloudpickle`
* get a local `LGBMRegressor` with `dask_reg.to_local()`, then write that to bytes with `cloudpickle`
* save model to a text file using `dask_reg.booster_.save_model()`

The code below shows how to store the entire `DaskLGBMRegressor` using `cloudpickle`.

In [None]:
import cloudpickle
with open("model.pkl", "wb") as f:
    cloudpickle.dump(dask_reg, f)

<hr>

## Calculate metrics on test set

Machine learning training tries to create a model which can produce useful results on new data that it didn't see during training. To test how well we've accomplished that in this example, read in another month of taxi data.

In [None]:
taxi_test = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-02.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
).sample(frac=0.01, replace=False)

Before creating predictions on this new dataset, it has to be transformed in exactly the way that the original training data were prepared. Thankfully you've already wrapped that transformation logic in a function!

In [None]:
taxi_test = prep_df(taxi_test, target_col=target_col)

taxi_test = taxi_test.persist()
_ = wait(taxi_test)

The Dask model object's `.predict()` method allows you to create predictions based on Dask Array or Dask DataFrame input.

In [None]:
import numpy as np

preds = dask_reg.predict(
    X=taxi_test[features],
    y=taxi_test[target_col].to_dask_array(),
    dtype=np.float32
)

In [None]:
help(dask_reg.predict)

The model produced by this training run is an instance of `DaskLGBMRegressor`. To get a regular non-Dask model (which can be pickled and saved), run `.to_local()`.

In [None]:
local_model = dask_reg.to_local()
type(local_model)

You can visualize this model by looking at a data frame representation of it.

In [None]:
local_model.booster_.trees_to_dataframe()

In [None]:
preds = dask_reg.predict(data[:100])
type(preds)

In [None]:
preds.compute()

In [None]:
pred_leaf = dask_reg.predict(data[:100], pred_leaf=True).compute()
pred_leaf