# What is Dask and why we need tools like it

There are no problems with processing datasets of up to several Gb, be it some computational task or machine learning model training. Single, although powerful enough, machine can handle such volume easily.

It's a bit more elaborated to process tens of `Gb` or more, or **speed-up training** of complex models. Since vertical scaling is always limited by how large the machine is, there's usually no other way, but to go for horizontal scaling and some type of parallelism.

**Dask** offers tools for this exact case. For example,

- you may want to **leverage all the cores** of your current machine to speed-up the computations, but do not want to to for `multiprocessing`,
- alternatively, you may need to **process data too large** for machine's memory, which is called **out-of-core processing**,
- or you may need a **unified setup** for both local parallelism (for prototyping) and distributed cloud-based computation.

Many interesting problems in machine learning are simply not solvable on a single machine and Dask offers a great and simple way to introduce parallelism into your problem.

Another benefit is that Dask is written in Python, so there's no need to use tricky to set up Scala-based Spark.

In this tutorial we will use **local** setup, i.e. Dask cluster will run on a single machine. The main benefit is full-utilization of all machine cores.

# Dask cluster

First, we need to create a Dask cluster and a Dask client:

In [None]:
%pylab inline
plt.style.use('bmh')

import pathlib
import numpy as np
import pandas as pd

from distributed import Client, LocalCluster

In [None]:
# you may want to change `n_workers` according to your hardware setup
cluster = LocalCluster(n_workers=12)
client = Client(cluster)

In [None]:
cluster

We created a **cluster** of 12 nodes, and connected to it as a client. Note that you can directly control cluster size from the notebook.

Under the hood, Dask cluster contains **scheduler**, which is responsible for handling computations and spreading them between nodes. Scheduler can be launched also from the command line (see [Command Line](https://docs.dask.org/en/latest/setup/cli.html) section of documentation).

Dask also provides nice realtime **dashboard** to overview tasks and workers (see link in the cell output above).

We can now submit tasks to Dask cluster:

In [None]:
result_future = client.submit(np.sin, np.random.randn(100))

Note that `client.submit` creates what is called **future**, i.e. a handle to task result, which is available as soon as computation completes.

You can retrieve task status

In [None]:
result_future.status

In [None]:
result_future.done()

or result:

In [None]:
result_future.result()

You can also submit **multiple tasks** at once (we recommend to open Dask dashboard alongside and observe how tasks start and proceed):

In [None]:
futures = [client.submit(np.sin, x) for x in np.random.randn(100)]

To get the results, we need to **gather** them:

In [None]:
results = client.gather(futures)
results

Dask also allows for straightforward **chaining** of tasks (note that `s`, `s_sq` and `s_full` are all futures, not Numpy arrays):

In [None]:
x = np.random.randn(10000)
s = client.submit(np.sin, x)
s_sq = client.submit(np.square, x)
s_full = client.submit(np.add, s, s_sq)

In [None]:
s_full.result()

# Dask and ML

As a specific and relevant example of parallelization for machine learning, we will consider parallel grid search. Imagine, that you need to fit a parametrized machine learning model (almost all ML models have some parameters).

To find a good set of hyperparameters, you need to fit a model set of parameters. The main Python package for classical machine learning - `scikit-learn` or `sklearn` for short - allows you to do that easily. We will use the Titanic dataset and create a simple classification model for it.

In [None]:
# you may need to change the location according to your local setup
DATA_DIR = pathlib.Path("")

In [None]:
train = pd.read_csv(DATA_DIR.joinpath("train.csv"), index_col="PassengerId")
test = pd.read_csv(DATA_DIR.joinpath("test.csv"), index_col="PassengerId")

We will preprocess the dataset first:

In [None]:
train.info()

In [None]:
age_imputation = train.groupby(["Pclass", "Sex"])["Age"].mean()

train = train.join(age_imputation,
                   on=("Pclass", "Sex"),
                   rsuffix="_imp")

train.loc[train.Age.isnull(), "Age"] = train.loc[train.Age.isnull(), "Age_imp"]
train.drop("Age_imp", axis=1, inplace=True)

test = test.join(age_imputation,
                 on=("Pclass", "Sex"),
                 rsuffix="_imp")

test.loc[test.Age.isnull(), "Age"] = test.loc[test.Age.isnull(), "Age_imp"]
test.drop("Age_imp", axis=1, inplace=True)

most_frequent_port = train.Embarked.value_counts().idxmax()
average_fare = train.Fare.mean()

train.fillna({"Embarked": most_frequent_port}, inplace=True)
test.fillna({"Embarked": most_frequent_port, "Fare": average_fare}, inplace=True)

train.drop(["Name", "Ticket", "Cabin"], axis=1, inplace=True)
test.drop(["Name", "Ticket", "Cabin"], axis=1, inplace=True)

In [None]:
train = pd.get_dummies(train, columns=["Pclass", "Sex", "Embarked"])
test = pd.get_dummies(test, columns=["Pclass", "Sex", "Embarked"])

FEATURES_COLS = train.columns[1:]
TARGET = "Survived"

In [None]:
train.info()

In [None]:
test.info()

Ok, now our dataset contains no missing values, is fully numeric, so that we can start modeling it. We will use random forest model. You do not need to understand it in full right now, but the main idea is to combine a lot of weak estimators (decision trees) and get a better result overall.

We will also use cross-validation, since it's a crucial part of hyperparameters search. The main idea is, again, simple: you train your models on a part of a dataset, you choose model parameters based on model performance on a different part (previously unseen to reduce overfitting risk, i.e. you cross-validate your model), and then you assess the final model performance with the best parameters on a test set. i.e. test your final model.

`sklearn` provides convenient classes for the entire grid search process. We will use 4-fold cross-validation: for each set of parameters, training dataset will be split in 4 equal parts, and four models will be fitted with that set of parameters in such a way that each model is cross-validated on one of four fold, and the remaining 3 are used for training.

`joblib` is the job manager, which dispatches calculations under the hood and can use different backends to do that in parallel.

In [None]:
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import f1_score, classification_report

First, we need to specify parameters grid. During grid search, all combinations will be used for fitting.

In [None]:
params = {
    "max_depth": [2,4,6],
    "n_estimators": [100, 200, 500],
    "class_weight": [None, "balanced"]
}

Now we create a model instance, so that `sklearn` knows which model we want. In `sklearn` API model parameters can be directly set, hence, we create an "empty" model, which will serve as a blueprint.

Also note, that we do not provide the scoring criterion. By default, `GridSearchCV` will use whatever scoring the model uses. In the case of `RandomForestClassifier` it's accuracy, which is exactly the metrics Kaggle uses for this dataset.

We now can launch the grid search itself:

In [None]:
model = RandomForestClassifier()
grid_cv = GridSearchCV(model, params, cv=4, verbose=1)

with joblib.parallel_backend('dask'):
    grid_cv.fit(train[FEATURES_COLS], train[TARGET])

`joblib` will use the local cluster we created to distribute the training jobs and run them in parallel. The best parameters for the features we have are:

In [None]:
grid_cv.best_params_

Best score, correspondingly:

In [None]:
grid_cv.best_score_

When grid search finds the best parameters, by default it refits the model on the entire training set, so that we do not need to do that manually. Effectively, we now have a random forest model, trained on the entire training set with the best model parameters. Let's use it for inference:

In [None]:
submission = pd.DataFrame(grid_cv.best_estimator_.predict(test[FEATURES_COLS]),
                          index=test.index, columns=["Survived"])

In [None]:
submission.head()

In [None]:
submission.to_csv(DATA_DIR.joinpath("dask_submission.csv"))

These predictions get about `0.775` when submitted to Kaggle.


# Final remarks

You may consider this an overkill for this specific model. That is true, and anyway `joblib` can handle local parallelism well enough. However, imagine that you're searching over a **huge grid** and have a **standby Dask cluster in the cloud**: in that case this setup will serve its purpose really well.

We haven't covered a lot of technical details about Dask (resource quoting, deployment and others), as well as out-of-core processing, but hopefully you got a feeling of it and will dig further as soon as you'll encounter long-running grid/random search or alike.