In [1]:
import numpy as np
from dask import delayed
from dask.distributed import Client, progress
import dask.array as da
import dask.dataframe as dd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from dask_ml.preprocessing import StandardScaler
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd

import warnings
warnings.filterwarnings("ignore")

In [2]:
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:36773  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


In [3]:
df = dd.read_csv('creditcard.csv', assume_missing=True)

In [5]:
# This is the feature set
X = df.drop('Class', axis=1)

# This is the target variable
Y = df["Class"]

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# Because your data can fit into memory,
# persist it to the RAM
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()


Dask Series Structure:
npartitions=3
    float64
        ...
        ...
        ...
Name: Class, dtype: float64
Dask Name: split, 3 tasks

### Use several scikit-learn models with Dask as the backend

#### RandomForest

In [6]:
rf_model = RandomForestClassifier()

with joblib.parallel_backend('dask'):
    scores = cross_validate(rf_model, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([162.50637126, 178.11106372, 174.97203493, 176.2040484 ]),
 'score_time': array([0.44314957, 0.33315873, 0.52245164, 0.51473951]),
 'test_score': array([0.02093782, 0.99945639, 0.99945639, 0.99945639])}

#### Logistic Regression

In [7]:
lrm = LogisticRegression()

In [8]:
with joblib.parallel_backend('dask'):
    scores = cross_validate(lrm, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([9.09335399, 8.13004661, 8.9636631 , 8.93264675]),
 'score_time': array([0.03493667, 0.0493331 , 0.02998352, 0.02430892]),
 'test_score': array([0.98628696, 0.99922842, 0.99917581, 0.99849192])}

#### SVM

In [10]:
# Initialize the scaler
scaler = StandardScaler()

# Scale the matrix
X_std = scaler.fit_transform(X)

X_std_train, X_std_test, y_train, y_test = train_test_split(X_std, Y, test_size=0.2)


X_std_train.persist()
X_std_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=3
    float64
        ...
        ...
        ...
Name: Class, dtype: float64
Dask Name: split, 3 tasks

In [11]:
# Initialize the scaler
scaler = StandardScaler()

# Scale the matrix
X_std = scaler.fit_transform(X)

X_std_train, X_std_test, y_train, y_test = train_test_split(X_std, Y, test_size=0.2)

In [12]:
svc_clf = SVC(kernel='linear')

In [14]:
with joblib.parallel_backend('dask'):
    scores = cross_validate(svc_clf, X_std_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([992.67481208, 153.02454853, 578.41277385, 288.77097225]),
 'score_time': array([0.66120648, 0.59257197, 0.47862506, 0.49963737]),
 'test_score': array([0.99917493, 0.99940314, 0.99936802, 0.9994558 ])}

### Discussion

All of these models fit faster than they would have without using dask. SVC still took a much longer time than the rest but it was reassuring to be able to monitor the progress in the client dashboard rather than worrying that it had hung. 