## Section 3.1

### This notebook contains code related to Dask

#### 1. Client

In [1]:
from dask.distributed import Client
client = Client(processes=False)

In [2]:
client

0,1
Client  Scheduler: inproc://192.168.0.105/16199/1  Dashboard: http://192.168.0.105:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 16.73 GB


#### 2. Parallel Collections - Handling a huge datasets

In [3]:
from dask_ml import datasets
from dask_ml.model_selection import train_test_split
import dask.array as da
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier

In [4]:
X, y = datasets.make_classification(n_samples=100000000,
                                         n_features=7,
                                         random_state=0,
                                         chunks=100000)

In [5]:
X

Unnamed: 0,Array,Chunk
Bytes,5.60 GB,5.60 MB
Shape,"(100000000, 7)","(100000, 7)"
Count,1000 Tasks,1000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 5.60 GB 5.60 MB Shape (100000000, 7) (100000, 7) Count 1000 Tasks 1000 Chunks Type float64 numpy.ndarray",7  100000000,

Unnamed: 0,Array,Chunk
Bytes,5.60 GB,5.60 MB
Shape,"(100000000, 7)","(100000, 7)"
Count,1000 Tasks,1000 Chunks
Type,float64,numpy.ndarray


In [6]:
y

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,800.00 kB
Shape,"(100000000,)","(100000,)"
Count,12001 Tasks,1000 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 800.00 MB 800.00 kB Shape (100000000,) (100000,) Count 12001 Tasks 1000 Chunks Type int64 numpy.ndarray",100000000  1,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,800.00 kB
Shape,"(100000000,)","(100000,)"
Count,12001 Tasks,1000 Chunks
Type,int64,numpy.ndarray


In [7]:
classes = da.unique(y).compute()

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y)

In [9]:
clf = SGDClassifier(loss='log', penalty='l2', tol=0.01)
# wrapping in Incremental
clf = Incremental(clf, scoring='accuracy')
clf.fit(X_train, y_train, classes=classes)
# while training check Client Dashboard

Incremental(estimator=SGDClassifier(loss='log', tol=0.01), scoring='accuracy')

#### 3. Distributed Optimization - Parallelizing trials over clusters

In [10]:
from sklearn.datasets import load_digits
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.svm import SVC
import joblib
import time

In [11]:
# define a simple classifier with GridSearch

X, y = load_digits().data, load_digits().target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, shuffle=True)

c = 0.001
gamma = 1e-10
param_grid = {
              "C": [c*(10**i) for i in range(1,14)],
              "gamma": [gamma*(10**i) for i in range(1,14)]
             }

clf = SVC(kernel='rbf')
search = GridSearchCV(clf, param_grid, cv=3)

In [12]:
since = time.time()
with joblib.parallel_backend('dask', scatter=[X_train, y_train]):
    model = search.fit(X_train, y_train)
print(time.time()-since)

38.08261847496033


#### 4. Distributing trials over cluster for a Huge Dataset

In [None]:
from dask_ml import datasets
from dask_ml.wrappers import Incremental
from dask_ml.model_selection import train_test_split, GridSearchCV
from dask_ml.metrics import accuracy_score

from sklearn.metrics import make_scorer
from sklearn.linear_model import SGDClassifier

import joblib

import dask.array as da
from dask.distributed import Client
client = Client(processes=False)
print(client.dashboard_link)

param_grid = {
              "penalty": ['l1', 'l2'],
              "tol": [1e-2, 1e-3, 1e-4]
             }

X, y = datasets.make_classification(n_samples=100000000,
                                    n_features=7,
                                    random_state=0,
                                    chunks=100000)

# providing an accuracy metrics from 'dask_ml'
scorer = make_scorer(accuracy_score)

X_train, X_test, y_train, y_test = train_test_split(X, y)

clf = SGDClassifier(loss='log')
clf_wrap = Incremental(clf, scoring=scorer)
searh_clf = GridSearchCV(clf_wrap, param_grid, cv=3)

with joblib.parallel_backend('dask'):
    model = searh_clf.fit(X_train, y_train)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37201 instead
  http_address["port"], self.http_server.port


http://192.168.0.105:37201/status
