<center><img src="https://github.com/insaid2018/Term-1/blob/master/Images/INSAID_Full%20Logo.png?raw=true" width="240" height="100" /></center>

# Distributed ML using Dask

<center><img src="https://camo.githubusercontent.com/196c935c419e8868c0b66ea714f325676fc7de70/687474703a2f2f6461736b2e72656164746865646f63732e696f2f656e2f6c61746573742f5f696d616765732f6461736b5f686f72697a6f6e74616c2e737667"width="800" height="400" /></center>

## Table of Contents

# Change

1. [Introduction](#section1)<br>
2. [Problem Statement](#section2)
3. [ Installing and importing libraries](#section3)
    - 3.1. [Installing Libraries](#section301)<br/>
    - 3.2. [Importing Libraries](#section302)<br/>
4. [Dask Arrays](#section4)
    - 4.1. [Creating a random dask array](#section401)<br/>
5. [Dask DataFrames](#section3)
    - 5.1. [Loading the dataset](#section501)<br/>
    - 5.2. [Data Pre-processing](#section502)<br/>
    - 5.3. [Dask Model implementation](#section503)<br/>
6. [Using blackfriday sale datset](#section6)
    - 6.1.  [Problem Statement](#section601)<br/>
    - 6.2.  [Data Description](#section602)<br/>
    - 6.3.  [Importing the dataset](#section603)<br/>
    - 6.4.  [Data Pre-processing](#section604)
    - 6.5.  [Dask model implementation](#section605)
7.  [Conclusions](#section6)<br>

### 1. Introduction

Dask is an Open Source project that gives you abstractions over NumPy Arrays, Pandas Dataframes and regular lists, allowing you to run operations on them in parallel, using multicore processing.

Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.

Dask can enable efficient parallel computations on single machines by leveraging their multi-core CPUs and streaming data efficiently from disk. It can run on a distributed cluster.

Few other things about dask:
- The ability to work in parallel with NumPy array and Pandas DataFrame objects
- Distributed computing
- Faster operation because of its low overhead and minimum serialization
- Runs resiliently on clusters with thousands of cores
- Real-time feedback and diagnostics

### 2. Problem statement

Taxi fare in a city depends on various attributes. You have to predict fare of taxi in New York city. A realistic dataset is given and the fare of taxi ride depends on attributes like - pickup location, dropoff location, pickup timestamp and the number of passengers travelling.

<center><img src="https://raw.githubusercontent.com/insaid2018/Term-2/master/images/1_0Ov3bD5xNxszuJTBYJjX4w.jpeg"width="800" height="400" /></center>


### 3. Installing and importing libraries

#### 3.1 Installing Libraries

Dask is installed in Anaconda by default. You can update it using the following command:

In [None]:
!pip install dask==2.22.0

If you face any issues in pip install in the notebook, then try conda install using below code

In [None]:
# python -m pip install "dask[dataframe]" --upgrade
# conda install dask

#### 3.2 Importing Libraries

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

In [2]:
import dask
import dask.dataframe as dd
import dask.array as da

In [3]:
from dask.distributed import Client, progress
import joblib

In [4]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:52853  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 12  Memory: 17.05 GB


### 4. Dask Arrays

A large numpy array is divided into smaller arrays which, when grouped together, form the Dask array. In simple words, Dask arrays are distributed numpy arrays! Every operation on a Dask array triggers operations on the smaller numpy arrays, each using a core on the machine. Thus all available cores are used simultaneously enabling computations on arrays which are larger than the memory size.

In summary, below are a few important features of Dask arrays below:

- Parallel: Dask arrays use all the cores of the system
- Larger-than-memory: Enables working on datasets that are larger than the memory available on the system (happens too often for me!). This is done by breaking the array into many small arrays and then performing the required operation


#### 4.1 Creating a random dask array

In [5]:
array = da.arange(21,chunks=7)

In [6]:
array

Unnamed: 0,Array,Chunk
Bytes,84 B,28 B
Shape,"(21,)","(7,)"
Count,3 Tasks,3 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 84 B 28 B Shape (21,) (7,) Count 3 Tasks 3 Chunks Type int32 numpy.ndarray",21  1,

Unnamed: 0,Array,Chunk
Bytes,84 B,28 B
Shape,"(21,)","(7,)"
Count,3 Tasks,3 Chunks
Type,int32,numpy.ndarray


In [7]:
array.compute()

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20])

In [8]:
#Finding the mean of the sample
array.mean()


Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 7 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,7 Tasks,1 Chunks
Type,float64,numpy.ndarray


This should have returned the mean of the sample.

This is because when we simply use dask_array.mean(), Dask builds a graph of tasks to be executed. To get the final result, we use the .compute() function which triggers the actual computations.

In [9]:
array.mean().compute()

10.0

### 5. Dask Dataframes

Similar to a Dask array, a Dask dataframe consists of multiple smaller pandas dataframes. A large pandas dataframe splits row-wise to form multiple smaller dataframes. These smaller dataframes are present on a disk of a single machine, or multiple machines (thus allowing to store datasets of size larger than the memory). Each computation on a Dask dataframe parallelizes operations on the existing pandas dataframes.

The APIs offered by the Dask dataframe are very similar to that of the pandas dataframe.

#### 5.1 Loading the Dataset

| Records | Features | Dataset Size |
| :-- | :-- | :-- |
| 5,50,0000 | 08 | 5.69 GB|

| Id | Features | Description |
| :--| :--| :--|
|01|**Key**|This is a unique identifier|
|02|**pickup_datetime**|Datetime when trip started|
|03|**pickup_longitude**|longitude coordinate of where trip started|
|04|**pickup_latitude**|latitude coordinate of where trip started|
|05|**dropoff_longitude**|longitude coordinate of where trip ended|
|06|**dropoff_latitude**|latitude coordinate of where trip ended|
|07|**passenger_count**|No. of passengers|
|08|**fare_amount**|Total fare of the ride|


In [10]:
#Getting the size of the data frame
import os
file = os.path.getsize("C:/Users/lenovo/Documents/train.csv")
print ('File size in bytes is {}'.format(file))

File size in bytes is 5697178298


In [11]:
#Importing the dataframe using pandas
start_time = datetime.now() 
# insert the code here
df=pd.read_csv('C:/Users/lenovo/Documents/train.csv')
time_elapsed = datetime.now() - start_time 
print('Time elapsed (hh:mm:ss.ms) {}'.format(time_elapsed))

Time elapsed (hh:mm:ss.ms) 0:07:23.899442


In [12]:
#Importing the dataframe using Dask
start_time = datetime.now() 
df1 = dd.read_csv('C:/Users/lenovo/Documents/train.csv')
time_elapsed = datetime.now() - start_time 
print('Time elapsed (hh:mm:ss.ms) {}'.format(time_elapsed))

Time elapsed (hh:mm:ss.ms) 0:00:00.025512


In [13]:
df1

Unnamed: 0_level_0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
npartitions=90,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,object,float64,object,float64,float64,float64,float64,int64
,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...


#### 5.2 Data Pre-processing

In [14]:
df1.columns

Index(['key', 'fare_amount', 'pickup_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'passenger_count'],
      dtype='object')

In [15]:
description=df1.describe()
description

Unnamed: 0_level_0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...


In [16]:
description=df1.describe().compute()
description

Unnamed: 0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
count,55423860.0,55423860.0,55423860.0,55423480.0,55423480.0,55423860.0
mean,11.34505,-72.50968,39.91979,-72.51121,39.92068,1.68538
std,20.71083,12.84888,9.642353,12.7822,9.633346,1.327664
min,-300.0,-3442.06,-3492.264,-3442.025,-3547.887,0.0
25%,6.0,-73.99201,40.7351,-73.99122,40.73441,1.0
50%,8.5,-73.9815,40.75299,-73.97995,40.7533,1.0
75%,12.9,-73.96649,40.76725,-73.96357,40.76818,2.0
max,93963.36,3457.626,3408.79,3457.622,3537.133,208.0


In [17]:
missing=df1.isnull().sum().compute()
missing

key                    0
fare_amount            0
pickup_datetime        0
pickup_longitude       0
pickup_latitude        0
dropoff_longitude    376
dropoff_latitude     376
passenger_count        0
dtype: int64

In [18]:
#Dropping rows having null values
df1=df1.dropna(how = 'any')

In [19]:
type(df1)

dask.dataframe.core.DataFrame

In [20]:
#Removing the data having fare amount less than 0
df1 = df1[df1.fare_amount>=0]

In [21]:
#Creating new features Longitude and latitude change
df1['lon_change'] = abs(df1.dropoff_longitude - df1.pickup_longitude)
df1['lat_change'] = abs(df1.dropoff_latitude - df1.pickup_latitude)

In [22]:
#Splitting the dependent and independent variables
Y = df1['fare_amount']
X = df1.drop('fare_amount',axis=1)

In [23]:
type(Y)

dask.dataframe.core.Series

In [24]:
type(X)

dask.dataframe.core.DataFrame

In [25]:
# Transforming the dataframe into an array
df2=X.values

In [26]:
df3=Y.values

In [27]:
df2.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,3.99 GB,44.84 MB
Shape,"(55421026, 9)","(622777, 9)"
Count,1530 Tasks,90 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 3.99 GB 44.84 MB Shape (55421026, 9) (622777, 9) Count 1530 Tasks 90 Chunks Type object numpy.ndarray",9  55421026,

Unnamed: 0,Array,Chunk
Bytes,3.99 GB,44.84 MB
Shape,"(55421026, 9)","(622777, 9)"
Count,1530 Tasks,90 Chunks
Type,object,numpy.ndarray


In [28]:
df3.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,443.37 MB,4.98 MB
Shape,"(55421026,)","(622777,)"
Count,1530 Tasks,90 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 443.37 MB 4.98 MB Shape (55421026,) (622777,) Count 1530 Tasks 90 Chunks Type float64 numpy.ndarray",55421026  1,

Unnamed: 0,Array,Chunk
Bytes,443.37 MB,4.98 MB
Shape,"(55421026,)","(622777,)"
Count,1530 Tasks,90 Chunks
Type,float64,numpy.ndarray


#### 5.3 Model Implementation

In [29]:
import joblib
 
#from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
    # Your normal scikit-learn code here
     from sklearn.linear_model import LinearRegression
     model=LinearRegression()
     model.fit(df2,df3)

distributed.utils - ERROR - 
Traceback (most recent call last):
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\utils.py", line 656, in log_errors
    yield
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\protocol\numpy.py", line 103, in deserialize_numpy_ndarray
    return pickle.loads(frames[0], buffers=frames[1:])
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\protocol\pickle.py", line 75, in loads
    return pickle.loads(x)
MemoryError
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\protocol\core.py", line 131, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\protocol\serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "C:\Users\lenovo\Anaconda3\lib\site-packages\distributed\protocol\serialize.py", line 51, in

MemoryError: 

bokeh.server.protocol_handler - ERROR - error handling message
 message: Message 'PATCH-DOC' content: {'references': [], 'events': [{'kind': 'ModelChanged', 'model': {'id': '1098'}, 'attr': 'text_align', 'new': {'value': 'right'}}, {'kind': 'ModelChanged', 'model': {'id': '1099'}, 'attr': 'text_align', 'new': {'value': 'right'}}, {'kind': 'ModelChanged', 'model': {'id': '1043'}, 'attr': 'reset_start', 'new': -8}, {'kind': 'ModelChanged', 'model': {'id': '1043'}, 'attr': 'reset_end', 'new': 0}, {'kind': 'ModelChanged', 'model': {'id': '1273'}, 'attr': 'interval', 'new': 86400000}, {'kind': 'ModelChanged', 'model': {'id': '1274'}, 'attr': 'interval', 'new': 259200000}, {'kind': 'ModelChanged', 'model': {'id': '1275'}, 'attr': 'interval', 'new': 604800000}, {'kind': 'ModelChanged', 'model': {'id': '1276'}, 'attr': 'interval', 'new': 1209600000}, {'kind': 'ModelChanged', 'model': {'id': '1277'}, 'attr': 'interval', 'new': 2592000000}, {'kind': 'ModelChanged', 'model': {'id': '1278'}, 'attr

Since the dataset is too large, so we are getting memory error while fitting the model in our system . You need to form a cluster so that you can increase the scalibility of the model. This is where dask becomes useful You can [click here to know how to setup a cluster](https://towardsdatascience.com/set-up-a-dask-cluster-for-distributed-machine-learning-31f587b1b553)

### 6. Using Blackfriday sale dataset

#### 6.1. Problem Statement

There is a retail store dataset given which contains sample of transactions made in the retail store. The objective is to know about the customer purchase for different products available in the retail store. Target variable is the amount of purchase, which depends on various attributes.

#### 6.2. Description of the dataset

| Id | Features | Description |
| :--| :--| :--|
|01|**User_ID**|This is a unique identifier|
|02|**Product_ID**|Unique id of the product|
|03|**Gender**|Gender of the individual|
|04|**Age**|Age of the individual|
|05|**Occupation**|Occupation of the individual|
|06|**City_Category**|Category of city|
|07|**Stay_In_Current_City_Years**|Duration of stay in the current city|
|08|**Marital_Status**|Marital Status of the individual|
|06|**Product_Category_1**|Product of category 1|
|07|**Product_Category_2**|Product of category 2|
|08|**Product_Category_3**|Profuct of category 3|
|09|**Purchase**|No. of Purchases|


#### 6.3. Importing the dataset

In [54]:
df_black = dd.read_csv("C:/Users/lenovo/Downloads/10479_14692_compressed_train.csv/Blackfriday.csv")

#### 6.4. Data-Pre-Processing

In [55]:
df_black.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


In [56]:
#checking the count of values
df_black.Gender.value_counts().compute()

M    414259
F    135809
Name: Gender, dtype: int64

In [57]:
#Grouping Gender on the basis of maximum purchase made
df_black.groupby(df_black.Gender).Purchase.max().compute()

Gender
F    23959
M    23961
Name: Purchase, dtype: int64

In [58]:
#Checking null values
df_black.isnull().sum().compute()

User_ID                            0
Product_ID                         0
Gender                             0
Age                                0
Occupation                         0
City_Category                      0
Stay_In_Current_City_Years         0
Marital_Status                     0
Product_Category_1                 0
Product_Category_2            173638
Product_Category_3            383247
Purchase                           0
dtype: int64

In [64]:
#Dropping all the null values
df_black=df_black.dropna()

In [65]:
#Splitting independent and dependent features
categorical_variables = df_black[['Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status']]
target = df_black['Purchase']

In [66]:
#Dummifying the categorical variables
data = dd.get_dummies(categorical_variables.categorize()).compute()

In [67]:
#Transforming it into an array
datanew=data.values

#### 6.5. Model implementation

In [68]:
with joblib.parallel_backend('dask'):
    from sklearn.linear_model import LinearRegression
    lr = LinearRegression()
    lr.fit(datanew, target)

### 7. Conclusions

1. Dask is useful whenever the dataset is larger than the memory of the system
2. The scalibility increases if we perform the operations by forming a cluster
3. Dask APIs are similar to pandas and numpy APIs
4. The whole dataset is not loaded in the memory when we import the dataset using dask dataframes unlike pandas