In [None]:
! pip install dask-ml

In [79]:
# DASK imports
from dask.distributed import Client
import dask.array as da
from dask_saturn import SaturnCluster
from dask_ml.wrappers import ParallelPostFit

# Scikit imports
from sklearn.linear_model import LogisticRegressionCV
from sklearn.model_selection import train_test_split

# misc imports
import time
import numpy as np
import pandas as pd


# EDA 

In [6]:
cancer = pd.read_csv('./data/breast_cancer.csv')
cancer.info()
cancer.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 569 entries, 0 to 568
Data columns (total 33 columns):
id                         569 non-null int64
diagnosis                  569 non-null object
radius_mean                569 non-null float64
texture_mean               569 non-null float64
perimeter_mean             569 non-null float64
area_mean                  569 non-null float64
smoothness_mean            569 non-null float64
compactness_mean           569 non-null float64
concavity_mean             569 non-null float64
concave points_mean        569 non-null float64
symmetry_mean              569 non-null float64
fractal_dimension_mean     569 non-null float64
radius_se                  569 non-null float64
texture_se                 569 non-null float64
perimeter_se               569 non-null float64
area_se                    569 non-null float64
smoothness_se              569 non-null float64
compactness_se             569 non-null float64
concavity_se               569 non

Unnamed: 0,id,diagnosis,radius_mean,texture_mean,perimeter_mean,area_mean,smoothness_mean,compactness_mean,concavity_mean,concave points_mean,...,texture_worst,perimeter_worst,area_worst,smoothness_worst,compactness_worst,concavity_worst,concave points_worst,symmetry_worst,fractal_dimension_worst,Unnamed: 32
0,842302,M,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,...,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189,
1,842517,M,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,...,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902,
2,84300903,M,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,...,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758,
3,84348301,M,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,...,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173,
4,84358402,M,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,...,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678,


In [7]:
# drop NaN column
cancer = cancer.drop(['Unnamed: 32'], axis=1)

In [8]:
cancer.tail()

Unnamed: 0,id,diagnosis,radius_mean,texture_mean,perimeter_mean,area_mean,smoothness_mean,compactness_mean,concavity_mean,concave points_mean,...,radius_worst,texture_worst,perimeter_worst,area_worst,smoothness_worst,compactness_worst,concavity_worst,concave points_worst,symmetry_worst,fractal_dimension_worst
564,926424,M,21.56,22.39,142.0,1479.0,0.111,0.1159,0.2439,0.1389,...,25.45,26.4,166.1,2027.0,0.141,0.2113,0.4107,0.2216,0.206,0.07115
565,926682,M,20.13,28.25,131.2,1261.0,0.0978,0.1034,0.144,0.09791,...,23.69,38.25,155.0,1731.0,0.1166,0.1922,0.3215,0.1628,0.2572,0.06637
566,926954,M,16.6,28.08,108.3,858.1,0.08455,0.1023,0.09251,0.05302,...,18.98,34.12,126.7,1124.0,0.1139,0.3094,0.3403,0.1418,0.2218,0.0782
567,927241,M,20.6,29.33,140.1,1265.0,0.1178,0.277,0.3514,0.152,...,25.74,39.42,184.6,1821.0,0.165,0.8681,0.9387,0.265,0.4087,0.124
568,92751,B,7.76,24.54,47.92,181.0,0.05263,0.04362,0.0,0.0,...,9.456,30.37,59.16,268.6,0.08996,0.06444,0.0,0.0,0.2871,0.07039


### Train test split

In [9]:
# separate target from rest of dataset 
X = cancer.drop(['diagnosis'], axis=1)
y = cancer['diagnosis']

# split the data to have a true hold out dataset 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=4444)

Our train dataset is not too big to fit into memory; however, say (hypothetically in this case) we need our predictions to happen on a much larger dataset -- one that does not fit into memory. 

# Dask 

spin up Saturn cluster

In [None]:
cluster = SaturnCluster()
cluster

simluate larger test dataset 

In [48]:
# N is the number of times we are replicating our data
N = 75
X_large = da.concatenate([da.from_array(X_train.values, chunks=X_train.shape)
                          for _ in range(N)])
y_large = da.concatenate([da.from_array(y_train.values, chunks=y_train.shape)
                          for _ in range(N)])
X_large

Unnamed: 0,Array,Chunk
Bytes,7.40 MB,98.70 kB
Shape,"(29850, 31)","(398, 31)"
Count,76 Tasks,75 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.40 MB 98.70 kB Shape (29850, 31) (398, 31) Count 76 Tasks 75 Chunks Type float64 numpy.ndarray",31  29850,

Unnamed: 0,Array,Chunk
Bytes,7.40 MB,98.70 kB
Shape,"(29850, 31)","(398, 31)"
Count,76 Tasks,75 Chunks
Type,float64,numpy.ndarray


**ParallelPostFit** is a Dask wrapper that we're using here to parallelize and distribute our models prediction 

In [64]:
clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")


In [65]:
# note dask ML is not operating here, so we can only use the dataset that fits into memory 
clf.fit(X_train.values,y_train.values)


ParallelPostFit(estimator=LogisticRegressionCV(Cs=10, class_weight=None, cv=3,
                                               dual=False, fit_intercept=True,
                                               intercept_scaling=1.0,
                                               l1_ratios=None, max_iter=100,
                                               multi_class='auto', n_jobs=None,
                                               penalty='l2', random_state=None,
                                               refit=True, scoring=None,
                                               solver='lbfgs', tol=0.0001,
                                               verbose=0),
                scoring='r2')

# Predictions via Dask 

now we can leverage Dask for predicting on the larger dataset that does not fit into memory 


In [66]:
y_pred = clf.predict(X_large)
y_pred

Unnamed: 0,Array,Chunk
Bytes,238.80 kB,3.18 kB
Shape,"(29850,)","(398,)"
Count,151 Tasks,75 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 238.80 kB 3.18 kB Shape (29850,) (398,) Count 151 Tasks 75 Chunks Type int64 numpy.ndarray",29850  1,

Unnamed: 0,Array,Chunk
Bytes,238.80 kB,3.18 kB
Shape,"(29850,)","(398,)"
Count,151 Tasks,75 Chunks
Type,int64,numpy.ndarray


And we can still use all the familiar Sklearn API methods, like pulling out our soft probabilities from the model, all we do is append .compute() and Dask executes

In [99]:
%%time

soft_probs = clf.predict_proba(X_large).compute()

CPU times: user 170 ms, sys: 25.4 ms, total: 195 ms
Wall time: 192 ms
