## Dask Arrays

In [83]:
import dask.array as da

In [84]:
#using arange to create an array with values from 0 to 10
X=da.arange(11, chunks=5)
X.compute()

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10])

In [85]:
X.chunks

((5, 5, 1),)

In [86]:
# converting numpy array to dask array
import numpy as np

x = np.arange(10)
y = da.from_array(x, chunks=5)
y.compute()

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

In [87]:
# calculating the mean of the first 100 numbers
x=np.arange(1000)
y=da.from_array(x, chunks=100) # converting numpy array to dask

In [88]:
y.mean().compute()

499.5

## Dask Dataframe

In [89]:
# using some data from csv file
import pandas as pd
%time
temp = pd.read_csv('train.csv')

Wall time: 0 ns


In [90]:
import dask.dataframe as dd
%time
df=dd.read_csv('train.csv')

Wall time: 0 ns


In [91]:
df.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


In [92]:
# finding value count for a particular column 
df.Gender.value_counts().compute()

M    414259
F    135809
Name: Gender, dtype: int64

In [93]:
# using groupby
df.groupby(df.Gender).Purchase.max().compute()

Gender
F    23959
M    23961
Name: Purchase, dtype: int64

## Dask ML

In [94]:
#reading the csv files
import dask.dataframe as dd
df= dd.read_csv('train.csv')
test= dd.read_csv('test.csv')

df.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


In [95]:
df.isnull().sum().compute()

User_ID                            0
Product_ID                         0
Gender                             0
Age                                0
Occupation                         0
City_Category                      0
Stay_In_Current_City_Years         0
Marital_Status                     0
Product_Category_1                 0
Product_Category_2            173638
Product_Category_3            383247
Purchase                           0
dtype: int64

In [96]:
# defining the data and target
categorical_variables = df[['Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status']]
target = df['Purchase']

In [97]:
# creating dummies for the categorical variables
data= dd.get_dummies(categorical_variables.categorize()).compute()

In [98]:
#converting dataframe to array
%time
datanew=data.values

#fit the model
from dask_ml.linear_model import LinearRegression
lr = LinearRegression()
lr.fit(datanew, target)

Wall time: 0 ns


LinearRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
         intercept_scaling=1.0, max_iter=100, multi_class='ovr', n_jobs=1,
         penalty='l2', random_state=None, solver='admm',
         solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)

In [99]:
#preparing the test data
test_categorical = test[['Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status']]
test_dummy = dd.get_dummies(test_categorical.categorize()).compute()
testnew = test_dummy.values

#predict on test and upload
pred=lr.predict(testnew)

In [100]:
from dask.distributed import Client
client = Client() # start a local Dask client

from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):

    # Create the parameter grid based on the results of random search 
    param_grid = {
    'bootstrap': [True],
    'max_depth': [8, 9],
    'max_features': [2, 3],
    'min_samples_leaf': [4, 5],
    'min_samples_split': [8, 10],
    'n_estimators': [100, 200]
    }

    # Create a based model
    from sklearn.ensemble import RandomForestRegressor
    rf = RandomForestRegressor()

In [101]:
# Instantiate the grid search model
import dask_searchcv as dcv 
grid_search = dcv.GridSearchCV(estimator = rf, param_grid = param_grid, cv = 3)
grid_search.fit(data, target)
grid_search.best_params_

tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:54207, threads: 1>>
Traceback (most recent call last):
  File "C:\Users\vsharm29\AppData\Local\Continuum\anaconda3\lib\site-packages\psutil\_pswindows.py", line 636, in wrapper
    return fun(self, *args, **kwargs)
  File "C:\Users\vsharm29\AppData\Local\Continuum\anaconda3\lib\site-packages\psutil\_pswindows.py", line 752, in memory_info
    t = self._get_raw_meminfo()
  File "C:\Users\vsharm29\AppData\Local\Continuum\anaconda3\lib\site-packages\psutil\_pswindows.py", line 727, in _get_raw_meminfo
    return cext.proc_memory_info(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\vsharm29\AppData\Local\Continuum\anaconda3\lib\site-packages\tornado\ioloop.py", line 1229, in _run
    return self.callback()
  File "C:\Users\vsharm29\AppData\Local\Cont

{'bootstrap': True,
 'max_depth': 9,
 'max_features': 3,
 'min_samples_leaf': 4,
 'min_samples_split': 8,
 'n_estimators': 200}

In [102]:
print('done')

done


In [103]:
print(grid_search.best_params_)

{'bootstrap': True, 'max_depth': 9, 'max_features': 3, 'min_samples_leaf': 4, 'min_samples_split': 8, 'n_estimators': 200}
