### Dask - Why every Data Scientist should use Dask?
---
![Dask](https://camo.githubusercontent.com/6bf060c071924b0d6cf6158048f12fde592f7939/68747470733a2f2f692e696d6775722e636f6d2f4d3342527268312e706e67)

---
#### [Why we talk about dask ?](#01)
#### [Magic thing about dask ?](#02)
#### [Dask Introduction](#03)

### Index <a id="99"></a>

1. [**Dask Installation**](#1)
1. [**Dask Data Science Interface**](#2)
    1. [**Dataframe : mimics Pandas**](#21)
        1. [**Talking Data Fraud Detection**](#12)
        2. [**Microsoft Malware Prediction**](#13)
    1. [**Array: mimics Numpy**](#22)
    1. [**Dask ML**](#23)
        1. [**Example of Dask XGBOOST**](#31)
        1. [**Training on Large Dataset**](#32)
        1. [**Voting Classifier**](#33)

---

#### Why we talk about dask ? <a id="01"></a>
* Dask is essentially the most progressive tool for data processing that I have experienced. On the off chance that you adore Pandas and Numpy however were at times battling with the data that would not fit into RAM then Dask is unquestionably what you require.
* Dask underpins the Pandas dataframe and Numpy exhibit data structures and can either be kept running on your local computer or be scaled up to keep running on a cluster. Basically you compose code once and after that decide to either run it locally or convey to a multi-hub cluster utilizing a simply typical Pythonic syntax structure. This is an incredible element in itself, yet it isn't the reason I am composing this notebook and saying that each data Scientist (in any event the one's utilizing Python) should utilize Dask.

---

#### Magic thing about dask ? <a id="02"></a>
* Dask is popularly known as a ‘parallel computing’ python library that has been designed to run across multiple systems. Your next question would understandably be – what is parallel computing?
* As in our example of separating the balls, 10 people doing the job simultaneously can be considered analogous to parallel computation. In technical terms, parallel computation is performing multiple tasks (or computations) simultaneously, using more than one resource.

---

### Dask Introduction <a id="03"></a>
Reference : [**From official Dask site**](http://docs.dask.org/en)

***Dask is composed of two parts:***

1. **Dynamic task scheduling** optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
2. **“Big Data” collections** like ***parallel arrays, dataframes, and lists*** that extend common interfaces like **NumPy, Pandas, or Python iterators to larger-than-memory** or distributed environments. These parallel collections run on top of dynamic task schedulers.


### Dask emphasizes the following virtues:

*   **Familiar**: Provides parallelized NumPy array and Pandas DataFrame objects
*   **Flexible**: Provides a task scheduling interface for more custom workloads and integration with other projects.
*   **Native**: Enables distributed computing in pure Python with access to the PyData stack.
*   **Fast**: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
*   **Scales up**: Runs resiliently on clusters with 1000s of cores
*   **Scales down**: Trivial to set up and run on a laptop in a single process
*   **Responsive**: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans

![](http://docs.dask.org/en/latest/_images/collections-schedulers.png)

---

### Example to understand working of Dask 
Reference : [Example](https://www.analyticsvidhya.com/blog/2018/08/dask-big-datasets-machine_learning-python/)
* Suppose you have 4 balls (of different colors) and you are asked to separate them within an hour (based on the color) into different buckets. you can do it immidiately. but what if it's thousands of balls with different color and give you task to seperate it what would u do that?

![](https://s3-ap-south-1.amazonaws.com/av-blog-media/wp-content/uploads/2018/07/image-21-300x169.jpg)

Question : ***What if it's thousands of balls with different color and give you task to seperate it what would u do that?*** and ***What if you are given a hundred balls and you have to separate them in an hour’s time?***  
Answer : **The best bet would be to ask a few other people for help.** You can call 9 other friends, give each of them 100 balls and ask them to separate these based on the color. In this case, 10 people are simultaneously working on the assigned task and together would be able to complete it faster than a single person would have (here you had a huge amount of data which you distributed among a bunch of people).

---

### Famous Presentations talk On Dask

*   **SciPy 2018, July 2018**
    *   [Scalable Machine Learning with Dask (30 minutes)](https://www.youtube.com/watch?v=ccfsbuqsjgI)
*   **PyCon 2018, May 2018**
    *   [Democratizing Distributed Computing with Dask and JupyterHub (32 minutes)](https://www.youtube.com/watch?v=Iq72dt1gO9c)
*   **AMS & ESIP, January 2018**
    *   [Pangeo quick demo: Dask, XArray, Zarr on the cloud with JupyterHub (3 minutes)](https://www.youtube.com/watch?v=rSOJKbfNBNk)
    *   [Pangeo talk: An open-source big data science platform with Dask, XArray, Zarr on the cloud with JupyterHub (43 minutes)](https://www.youtube.com/watch?v=mDrjGxaXQT4)
*   **PYCON.DE 2017, November 2017**
    *   [Dask: Parallelism in Python (1 hour, 2 minutes)](https://www.youtube.com/watch?v=rZlshXJydgQ)
*   **PYCON 2017, May 2017**
    *   [Dask: A Pythonic Distributed Data Science Framework (46 minutes)](https://www.youtube.com/watch?v=RA_2qdipVng)
*   **PLOTCON 2016, December 2016**
    *   [Visualizing Distributed Computations with Dask and Bokeh (33 minutes)](https://www.youtube.com/watch?v=FTJwDeXkggU)
*   **PyData DC, October 2016**
    *   [Using Dask for Parallel Computing in Python (44 minutes)](https://www.youtube.com/watch?v=s4ChP7tc3tA)
*   **SciPy 2016, July 2016**
    *   [Dask Parallel and Distributed Computing (28 minutes)](https://www.youtube.com/watch?v=PAGjm4BMKlk)
*   **PyData NYC, December 2015**
    *   [Dask Parallelizing NumPy and Pandas through Task Scheduling (33 minutes)](https://www.youtube.com/watch?v=mHd8AI8GQhQ)
*   **PyData Seattle, August 2015**
    *   [Dask: out of core arrays with task scheduling (1 hour, 50 minutes)](https://www.youtube.com/watch?v=ieW3G7ZzRZ0)
*   **SciPy 2015, July 2015**
    *   [Dask Out of core NumPy:Pandas through Task Scheduling (16 minutes)](https://www.youtube.com/watch?v=1kkFZ4P-XHg)
---
### 1.Dask Installation <a id="1"></a>
[**Go to top**](#99)

1. **Conda Installation** `conda install dask`
2. **pip Installation** `pip install “dask[complete]”`
3. **Source**
` 
#clone git
git clone https://github.com/dask/dask.git
cd dask
python setup.py install
`
![](https://www.kaggle.com/ashishpatel26/dask-u/downloads/installation.JPG)

---

### 2.Dask Data Science Interface<a id="2"></a>

---
[**Go to top**](#99)
### 2.1.**Dataframe : mimics Pandas** <a id="21"></a>
* A DataFrame Dask is a large parallel DataFrame that consists of many smaller pandas DataFrames across the index. These pandas data frames can be stored on disk for a computation that is larger than the memory of a single machine or on many different machines in a cluster. A Dask DataFrame operation triggers many operations on the  Pandas DataFrame components.

![image.png](https://s3-ap-south-1.amazonaws.com/av-blog-media/wp-content/uploads/2018/08/dask-df-768x569.png)

![](https://www.kaggle.com/ashishpatel26/dask-u/downloads/dataframe.JPG)

### Useful Basic operations

*   **Trivially parallelizable operations (fast):**
    
    *   Element-wise operations: **`df.x + df.y`, `df * df`**
    *   Row-wise selections: **`df[df.x > 0]`**
    *   Loc: **`df.loc[4.0:10.5]`**
    *   Common aggregations: **`df.x.max()`, `df.max()`**
    *   Is in: **`df[df.x.isin([1, 2, 3])]`**
    *   Date time/string accessors: **`df.timestamp.month`**
    
*   **Cleverly parallelizable operations (fast):**
    
    *   groupby-aggregate (with common aggregations): **`df.groupby(df.x).y.max()`, `df.groupby('x').max()`**
    *   groupby-apply on index: **`df.groupby(['idx', 'x']).apply(myfunc)`, where `idx` is the index level name**
    *   value\_counts: **`df.x.value_counts()`**
    *   Drop duplicates: **`df.x.drop_duplicates()`**
    *   Join on index: **`dd.merge(df1, df2, left_index=True, right_index=True)`** or **`dd.merge(df1, df2, on=['idx', 'x'])`** where **`idx`** is the index name for both **`df1` and `df2`**
    *   Join with Pandas DataFrames: **`dd.merge(df1, df2, on='id')`**
    *   Element-wise operations with different partitions / divisions: **`df1.x + df2.y`**
    *   Date time resampling: **`df.resample(...)`**
    *   Rolling averages: **`df.rolling(...)`**
    *   Pearson’s correlation: **`df[['col1', 'col2']].corr()`**
    
*   **Operations requiring a shuffle (slow-ish, unless on index)**
    
    *   Set index: **`df.set_index(df.x)`**
    *   groupby-apply not on index (with anything): **`df.groupby(df.x).apply(myfunc)`**
    *   Join not on the index: **`dd.merge(df1, df2, on='name')`**

## Two Example 
[**Go to top**](#99)

1. [**Talking Data Fraud Detection**](#12)
2. [**Microsoft Malware Prediction**](#13)

In [None]:
from IPython.display import HTML
#Define the javascript function and HTML to produce the show/hide code button
# text = str('''
#     <script>
#   function code_toggle() {
#     if (code_shown){
#       $('div.input').hide('500');
#       $('#toggleButton').val('Show Code')
#     } else {
#       $('div.input').show('500');
#       $('#toggleButton').val('Hide Code')
#     }
#     code_shown = !code_shown
#   }

#   $( document ).ready(function(){
#     code_shown=false;
#     $('div.input').hide()
#   });
# </script>
# <form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>''')

# HTML(text)

In [None]:
HTML("<iframe width = 100%, height = 800, src = 'https://docs.dask.org/en/latest/_downloads/daskcheatsheet.pdf'></iframe>")

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory
import dask
import dask.dataframe as dd
import os
print(os.listdir("../input"))

import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('fivethirtyeight')


# Any results you write to the current directory are saved as output.

## Talking Data Fraud Detection <a id="12"></a>
[**Go to top**](#99)

Time to read dataframe: 

* **Dask - 179 ms**
* **Pandas - my result kernel crash in kaggle**

#### 1. Read data

In [None]:
%%time
# Loading in the train data
dtypes = {'ip':'uint32',
          'app': 'uint16',
          'device': 'uint16',
          'os': 'uint16',
          'channel': 'uint16',
          'is_attributed': 'uint8'}

train = dd.read_csv("../input/talkingdata-adtracking-fraud-detection/train.csv", dtype=dtypes, parse_dates=['click_time', 'attributed_time'])

#### 2. Display data

In [None]:
train.head()

#### 3.Information about dataframe

In [None]:
train.info()

#### 4. Length of dataframe

In [None]:
len(train)

In [None]:
train.describe()

### 5.Check datatype

In [None]:
train.dtypes

In [None]:
train.dtypes.value_counts().plot(kind="pie", figsize = (10,10))

## Microsoft Malware Prediction <a id="13"></a>
[**Go to top**](#99)

Time to read dataframe: 

* **Dask - 419 ms**
* **Pandas - my result kernel crash in kaggle**

#### 1.Read data

In [None]:
dtypes1 = {
        'MachineIdentifier':                                    'category',
        'ProductName':                                          'category',
        'EngineVersion':                                        'category',
        'AppVersion':                                           'category',
        'AvSigVersion':                                         'category',
        'IsBeta':                                               'int8',
        'RtpStateBitfield':                                     'float16',
        'IsSxsPassiveMode':                                     'int8',
        'DefaultBrowsersIdentifier':                            'float16',
        'AVProductStatesIdentifier':                            'float32',
        'AVProductsInstalled':                                  'float16',
        'AVProductsEnabled':                                    'float16',
        'HasTpm':                                               'int8',
        'CountryIdentifier':                                    'int16',
        'CityIdentifier':                                       'float32',
        'OrganizationIdentifier':                               'float16',
        'GeoNameIdentifier':                                    'float16',
        'LocaleEnglishNameIdentifier':                          'int8',
        'Platform':                                             'category',
        'Processor':                                            'category',
        'OsVer':                                                'category',
        'OsBuild':                                              'int16',
        'OsSuite':                                              'int16',
        'OsPlatformSubRelease':                                 'category',
        'OsBuildLab':                                           'category',
        'SkuEdition':                                           'category',
        'IsProtected':                                          'float16',
        'AutoSampleOptIn':                                      'int8',
        'PuaMode':                                              'category',
        'SMode':                                                'float16',
        'IeVerIdentifier':                                      'float16',
        'SmartScreen':                                          'category',
        'Firewall':                                             'float16',
        'UacLuaenable':                                         'float32',
        'Census_MDC2FormFactor':                                'category',
        'Census_DeviceFamily':                                  'category',
        'Census_OEMNameIdentifier':                             'float16',
        'Census_OEMModelIdentifier':                            'float32',
        'Census_ProcessorCoreCount':                            'float16',
        'Census_ProcessorManufacturerIdentifier':               'float16',
        'Census_ProcessorModelIdentifier':                      'float16',
        'Census_ProcessorClass':                                'category',
        'Census_PrimaryDiskTotalCapacity':                      'float32',
        'Census_PrimaryDiskTypeName':                           'category',
        'Census_SystemVolumeTotalCapacity':                     'float32',
        'Census_HasOpticalDiskDrive':                           'int8',
        'Census_TotalPhysicalRAM':                              'float32',
        'Census_ChassisTypeName':                               'category',
        'Census_InternalPrimaryDiagonalDisplaySizeInInches':    'float16',
        'Census_InternalPrimaryDisplayResolutionHorizontal':    'float16',
        'Census_InternalPrimaryDisplayResolutionVertical':      'float16',
        'Census_PowerPlatformRoleName':                         'category',
        'Census_InternalBatteryType':                           'category',
        'Census_InternalBatteryNumberOfCharges':                'float32',
        'Census_OSVersion':                                     'category',
        'Census_OSArchitecture':                                'category',
        'Census_OSBranch':                                      'category',
        'Census_OSBuildNumber':                                 'int16',
        'Census_OSBuildRevision':                               'int32',
        'Census_OSEdition':                                     'category',
        'Census_OSSkuName':                                     'category',
        'Census_OSInstallTypeName':                             'category',
        'Census_OSInstallLanguageIdentifier':                   'float16',
        'Census_OSUILocaleIdentifier':                          'int16',
        'Census_OSWUAutoUpdateOptionsName':                     'category',
        'Census_IsPortableOperatingSystem':                     'int8',
        'Census_GenuineStateName':                              'category',
        'Census_ActivationChannel':                             'category',
        'Census_IsFlightingInternal':                           'float16',
        'Census_IsFlightsDisabled':                             'float16',
        'Census_FlightRing':                                    'category',
        'Census_ThresholdOptIn':                                'float16',
        'Census_FirmwareManufacturerIdentifier':                'float16',
        'Census_FirmwareVersionIdentifier':                     'float32',
        'Census_IsSecureBootEnabled':                           'int8',
        'Census_IsWIMBootEnabled':                              'float16',
        'Census_IsVirtualDevice':                               'float16',
        'Census_IsTouchEnabled':                                'int8',
        'Census_IsPenCapable':                                  'int8',
        'Census_IsAlwaysOnAlwaysConnectedCapable':              'float16',
        'Wdft_IsGamer':                                         'float16',
        'Wdft_RegionIdentifier':                                'float16',
        'HasDetections':                                        'int8'
        }

In [None]:
%%time
train_df = dd.read_csv("../input/microsoft-malware-prediction/train.csv", dtype = dtypes1)

#### 2. Display header values

In [None]:
train_df.head()

#### 3.Information about dataframe

In [None]:
train_df.info()

In [None]:
train_df.describe()

#### 4.Aggregation operation

In [None]:
train_df.groupby(train_df.Census_ChassisTypeName).Census_InternalPrimaryDiagonalDisplaySizeInInches.mean().compute()

In [None]:
len(train_df)

#### 5. Data type Count plot

In [None]:
train_df.dtypes.value_counts().plot(kind="pie", figsize = (10,10))

#### 6.Reset Index

In [None]:
train_df=train_df.reset_index()

#### 7. Visualize Data

Note : ***Double click on image to see the zoom version this image***

In [None]:
train_df.Census_SystemVolumeTotalCapacity.max().visualize()

### 2.2.**Array: mimics Numpy**<a id="22"></a>
---
[**Go to top**](#99)
* Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.


![](https://s3-ap-south-1.amazonaws.com/av-blog-media/wp-content/uploads/2018/07/image-4-e1532888794915.png)

* Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.

![](//github.com/dask/dask-tutorial/raw/33efceb9ba76b16ce3bf51d6210546e257f0874b/images/array.png) Dask array provides a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

*   **Parallel**: Uses all of the cores on your computer
*   **Larger-than-memory**: Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*   **Blocked Algorithms**: Perform large computations by performing many smaller computations

**Related Documentation**

*   [Documentation](http://dask.readthedocs.io/en/latest/array.html)
*   [API reference](http://dask.readthedocs.io/en/latest/array-api.html)

![](https://www.kaggle.com/ashishpatel26/dask-u/downloads/array.JPG)

* Dask Array is used in fields like atmospheric and oceanographic science, large scale imaging, genomics, numerical algorithms for optimization or statistics, and more.


Use Basic operation of Dask Array
---------------------------------------------

Dask arrays support most of the NumPy interface like the following:

*   Arithmetic and scalar mathematics: **`+, *, exp, log, ...`**
*   Reductions along axes: **`sum(), mean(), std(), sum(axis=0), ...`**
*   Tensor contractions / dot products / matrix multiply: **`tensordot`**
*   Axis reordering / transpose: **`transpose`**
*   Slicing: **`x[:100, 500:100:-2]`**
*   Fancy indexing along single axes with lists or NumPy arrays: **`x[:, [10, 1, 5]]`**
*   Array protocols like **`__array__` and `__array_ufunc__`**
*   Some linear algebra: **`svd, qr, solve, solve_triangular, lstsq`**
*   …

However, Dask Array does not implement the entire NumPy interface. Users expecting this will be disappointed. Notably, Dask Array lacks the following features:

*   Much of **`np.linalg`** has not been implemented. This has been done by a number of excellent **BLAS/LAPACK** implementations, and is the focus of numerous ongoing academic research projects
*   Arrays with unknown shapes do not support all operations
*   Operations like **`sort`** which are notoriously difficult to do in parallel, and are of somewhat diminished value on very large data (you rarely actually need a full sort). Often we include parallel-friendly alternatives like **topk`**
*   Dask Array doesn’t implement operations like **`tolist`** that would be very inefficient for larger datasets. Likewise, it is very inefficient to iterate over a Dask array with for loops
*   Dask development is driven by immediate need, hence many lesser used functions have not been implemented. Community contributions are encouraged

### Create Random array using dask

In [None]:
import dask.array as da    

#using arange to create an array with values from 0 to 10
X = da.arange(20, chunks=5)
X.compute() 

In [None]:
#to see size of each chunk
X.chunks

In [None]:
result = X.sum()
result.compute()

### Convert a numpy array to Dask array

In [None]:
import numpy as np
import dask.array as da
x = np.arange(100)
y = da.from_array(x, chunks=5)
y.compute() #results in a dask array

### Calculating mean of the first 100 numbers

In [None]:
x = np.arange(1000)  #arange is used to create array on values from 0 to 1000
y = da.from_array(x, chunks=(100))  #converting numpy array to dask array

y.mean().compute()  #computing mean of the array

### Slicing


#### Dask.array supports most of the NumPy slicing syntax. In particular it supports the following:

*   Slicing by integers and slices **`x[0, :5]`**
*   Slicing by lists/arrays of integers **`x[[1, 2, 4]]`**
*   Slicing by lists/arrays of booleans **`x[[False, True, True, False, True]]`**

It does not currently support the following:

*   Slicing one `dask.array` with another **`x[x > 0]`**
*   Slicing with lists in multiple axes **`x[[1, 2, 3], [3, 2, 1]]`**

Both of these are straighforward to add though. If you have a use case then raise an issue.

In [None]:
x = da.ones((10000, 10000), chunks = 5000)
da.exp(x)[:5000, :5000].compute()

In [None]:
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

### Persist data in memory
* If you have the available RAM for your dataset then you can persist data in memory.
* This allows future computations to be much faster.

In [None]:
y = y.persist()
%time y[0, 0].compute()

In [None]:
%time z.sum().compute()

In [None]:
x[[1,4,7]].compute()

Stack and Concatenate
======
[**Go to top**](#99)
* Often we have many arrays stored on disk that we want to stack together and think of as one large array. This is common with geospatial data in which we might have many HDF5/NetCDF files on disk, one for every day, but we want to do operations that span multiple days.


### Stack

We stack many existing dask Arrays into a new array, creating a new dimension as we go.

In [None]:
import dask.array as da
data = [da.from_array(np.ones((4, 4)), chunks=(2, 2)) for i in range(3)]  # A small stack of dask arrays

In [None]:
x = da.stack(data, axis=0)
x.shape

In [None]:
da.stack(data, axis=1).shape

In [None]:
da.stack(data, axis=-1).shape

This creates a new dimension with length equal to the number of slices

### Concatenate

We concatenate existing arrays into a new array, extending them along an existing dimension

In [None]:
import dask.array as da
import numpy as np

data = [da.from_array(np.ones((4, 4)), chunks=(2, 2)) for i in range(3)]  # small stack of dask arrays

In [None]:
x = da.concatenate(data, axis=0)
x.shape

In [None]:
da.concatenate(data, axis=1).shape

### 2.3. Dask ML <a id="23"></a>
[**Go to top**](#99)

* **Dask ML** provides scalable Python **machine learning algorithms** that are compatible with Scikit-Learn. First let's understand how Scikit-Learn deals with calculations, and then let's see how Dask performs these operations differently.

![](https://s3-ap-south-1.amazonaws.com/av-blog-media/wp-content/uploads/2018/07/sklearn-11.png)

* A user can do parallel computing with scikit-learn (on a single machine) by setting the parameter **`njobs = -1.`** Scikit-learn uses Joblib to perform these parallel calculations. **Joblib is a library in Python that provides support for parallelization.** When you call the **.fit () function** , which is based on the tasks to be performed (whether hyperparameters search or model matching), Joblib distributes the task to the available cores.

* Although parallel calculations can be done with scikit-learn, they can not be resized on multiple machines. On the other hand, **Dask works well on a single machine and can even be resized to a group of machines.**

![](https://s3-ap-south-1.amazonaws.com/av-blog-media/wp-content/uploads/2018/07/dask-1.png)

* Dask has a central **task scheduler** and a **group of workers.** The scheduler  assigns tasks to the workers. Each worker is assigned a number of cores in which he can perform calculations. 
**Workers provide two functions:**
    1. The calculation tasks assigned by the programmer 
    2. deliver the results to other worker on request
    
 *  Here's an example that explains what a conversation between a programmer and the workers looks like (by Matthew Dask, a developer of Dask):

* The central task scheduler sends jobs (Python functions) to different work processes, either on the same machine or in a cluster:

        Worker A, compute x = f (1), worker B, y = g (2)
        Worker A, if g (2) is executed, get y from Worker B and compute z = h (x, y).
        This should give you a clear idea of how Dask works. Now the models of machine learning and the CV of the Dask search are discussed!
        

### 3.1 Example of Dask XGBOOST <a id="31"></a>
[**Go to top**](#99)

<div class="wy-table-responsive"><table border="1" class="longtable docutils">
<colgroup>
<col width="10%">
<col width="90%">
</colgroup>
<tbody valign="top">
<tr class="row-odd"><td><a class="reference internal" href="modules/generated/dask_ml.xgboost.train.html#dask_ml.xgboost.train" title="dask_ml.xgboost.train"><code class="xref py py-obj docutils literal notranslate"><span class="pre">train</span></code></a>(client,&nbsp;params,&nbsp;data,&nbsp;labels[,&nbsp;…])</td>
<td>Train an XGBoost model on a Dask Cluster</td>
</tr>
<tr class="row-even"><td><a class="reference internal" href="modules/generated/dask_ml.xgboost.predict.html#dask_ml.xgboost.predict" title="dask_ml.xgboost.predict"><code class="xref py py-obj docutils literal notranslate"><span class="pre">predict</span></code></a>(client,&nbsp;model,&nbsp;data)</td>
<td>Distributed prediction with XGBoost</td>
</tr>
<tr class="row-odd"><td><a class="reference internal" href="modules/generated/dask_ml.xgboost.XGBClassifier.html#dask_ml.xgboost.XGBClassifier" title="dask_ml.xgboost.XGBClassifier"><code class="xref py py-obj docutils literal notranslate"><span class="pre">XGBClassifier</span></code></a>([max_depth,&nbsp;learning_rate,&nbsp;…])</td>
<td><table class="docutils field-list" frame="void" rules="none">
<colgroup><col class="field-name">
<col class="field-body">
</colgroup><tbody valign="top">
<tr class="field-odd field"><th class="field-name">Attributes:</th><td class="field-body"></td>
</tr>
</tbody>
</table>
</td>
</tr>
<tr class="row-even"><td><a class="reference internal" href="modules/generated/dask_ml.xgboost.XGBRegressor.html#dask_ml.xgboost.XGBRegressor" title="dask_ml.xgboost.XGBRegressor"><code class="xref py py-obj docutils literal notranslate"><span class="pre">XGBRegressor</span></code></a>([max_depth,&nbsp;learning_rate,&nbsp;…])</td>
<td><table class="docutils field-list" frame="void" rules="none">
<colgroup><col class="field-name">
<col class="field-body">
</colgroup><tbody valign="top">
<tr class="field-odd field"><th class="field-name">Attributes:</th><td class="field-body"></td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table></div>

In [None]:
## dask_ml is different package from dask so you have to install this package
## pip install dask_ml
import dask_ml

### XGB Setup
* Dask and XGBoost can work together to train gradient boosted trees in parallel. This notebook shows how to use Dask and XGBoost together.

* XGBoost provides a powerful prediction framework, and it works well in practice. It wins Kaggle contests and is popular in industry because it has good performance and can be easily interpreted (i.e., it’s easy to find the important features from a XGBoost model).

In [None]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1)
client

### Create data
First we create a bunch of synthetic data, with 100,000 examples and 20 features.

In [None]:
from dask_ml.datasets import make_classification

X, y = make_classification(n_samples=100000, n_features=20,
                           chunks=1000, n_informative=4,
                           random_state=0)
X

Dask-XGBoost works with both arrays and dataframes. For more information on creating dask arrays and dataframes from real data, see documentation on Dask arrays or Dask dataframes.

### Split data for training and testing
We split our dataset into training and testing data to aid evaluation by making sure we have a fair test:

In [None]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15)

### Train Dask XGBOOST

dask-xgboost is a small wrapper around xgboost. Dask sets XGBoost up, gives XGBoost data and lets XGBoost do it’s training in the background using all the workers Dask has available.

Let’s do some training:

In [None]:
import dask_xgboost
import xgboost

In [None]:
%%time
params = {'objective': 'binary:logistic',
          'max_depth': 4, 'eta': 0.01, 'subsample': 0.5,
          'min_child_weight': 0.5}

bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=10)

### Visualize results
The bst object is a regular xgboost.Booster object.

In [None]:
bst

#### This means all the methods mentioned in the XGBoost documentation are available. We show two examples to expand on this, but these examples are of XGBoost instead of Dask.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

ax = xgboost.plot_importance(bst, height=0.8, max_num_features=9)
ax.grid(False, axis="y")
ax.set_title('Estimated feature importance')
plt.show()

We specified that only 4 features were informative while creating our data, and only 3 features show up as important.

### Plot the Receiver Operating Characteristic curve

In [None]:
y_hat = dask_xgboost.predict(client, bst, X_test).persist()
y_hat

In [None]:
from sklearn.metrics import roc_curve
fpr, tpr, _ = roc_curve(y_test, y_hat)

In [None]:
from sklearn.metrics import auc

fig, ax = plt.subplots(figsize=(12, 8))
ax.plot(fpr, tpr, lw=3,
        label='ROC Curve (area = {:.2f})'.format(auc(fpr, tpr)))
ax.plot([0, 1], [0, 1], 'k--', lw=2)
ax.set(
    xlim=(0, 1),
    ylim=(0, 1),
    title="ROC Curve",
    xlabel="False Positive Rate",
    ylabel="True Positive Rate",
)
ax.legend();
plt.show()

###  Reimplement Algorithms with Dask Array

* For simple machine learning algorithms which use Numpy arrays, Dask ML re-implements these algorithms. Dask replaces numpy arrays with Dask arrays to achieve scalable algorithms. This has been implemented for:

    * Linear models (linear regression, logistic regression, poisson regression)
    * Pre-processing (scalers , transforms)
    * Clustering (k-means, spectral clustering)

### A.  Linear model example

    from dask_ml.linear_model import LogisticRegression

    model = LogisticRegression()
    model.fit(data, labels)
    
### B.  Pre-processing example

    from dask_ml.preprocessing import OneHotEncoder

    encoder = OneHotEncoder(sparse=True)
    result = encoder.fit(data)

### C. Clustering example

    from dask_ml.cluster import KMeans
    model = KMeans()
    model.fit(data)

### 3.2. Train Models on Large Datasets <a id="32"></a>
[**Go to top**](#99)

Most Scikit-Learn estimators are designed to work with NumPy arrays or sparse Scipy matrices. These data structures must fit into RAM on a single machine.

Estimates implemented in Dask-ML work well with Dask arrays and DataFrames. This can be much larger than the memory of a single machine. They can be distributed on a cluster of machines in memory.

In [None]:
%matplotlib inline

In [None]:
# from dask.distributed import Client

# # Scale up: connect to your own cluster with more resources
# # see http://dask.pydata.org/en/latest/setup.html
# client = Client(processes=False, threads_per_worker=4,
#                 n_workers=1, memory_limit='2GB')
# client

In [None]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In this example, we’ll use `dask_ml.datasets.make_blobs` to generate some random dask arrays.

In [None]:
# Scale up: increase n_samples or n_features
X, y = dask_ml.datasets.make_blobs(n_samples=1000000,
                                   chunks=100000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

We’ll use the k-means implemented in Dask-ML to cluster the points. It uses the k-means|| (read: **“k-means parallel”**) initialization algorithm, which scales better than **k-means++**. All of the computation, both during and after initialization, can be done in parallel.

In [None]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

In [None]:
fig, ax = plt.subplots()
ax.scatter(X[::1000, 0], X[::1000, 1], marker='.', c=km.labels_[::1000],
           cmap='viridis', alpha=0.25);

### For More update - [DASK ML API DOCUMENT](http://ml.dask.org/modules/api.html)

### 3.3 Voting Classifier <a id="33"></a>
[**Go to Top**](#99)

A [Voting classifier](http://scikit-learn.org/stable/modules/ensemble.html#voting-classifier) model combines multiple different models (i.e., sub-estimators) into a single model, which is (ideally) stronger than any of the individual models alone.

[Dask](http://ml.dask.org/joblib.html) provides the software to train individual sub-estimators on different machines in a cluster. This enables users to train more models in parallel than would have been possible on a single machine. Note that users will only observe this benefit if they have a distributed cluster with more resources than their single machine (because sklearn already enables users to parallelize training across cores on a single machine).

In [None]:
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

import sklearn.datasets

We create a synthetic dataset (with 1000 rows and 20 columns) that we can give to the voting classifier model.

In [None]:
X, y = sklearn.datasets.make_classification(n_samples=1_000, n_features=20)

In [None]:
classifiers = [
    ('sgd', SGDClassifier(max_iter=1000)),
    ('logisticregression', LogisticRegression()),
    ('svc', SVC(gamma='auto')),
]
clf = VotingClassifier(classifiers, n_jobs=-1)

In [None]:
%time clf.fit(X, y)

In [None]:
from sklearn.externals import joblib
# from distributed import Client

# client = Client()
# client

To train the **`voting classifier`**, we call the classifier’s fit method, but enclosed in joblib’s parallel_backend context manager. This distributes training of sub-estimators acoss the cluster.

In [None]:
%%time
with joblib.parallel_backend("dask"):
    clf.fit(X, y)

print(clf)

### References : 
[**Go to top**](#99)
1. **https://docs.dask.org/**
2. **https://www.analyticsvidhya.com/blog/2018/08/dask-big-datasets-machine_learning-python/**
3. **https://towardsdatascience.com/why-every-data-scientist-should-use-dask-81b2b850e15b**
4. **https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915**
5. **https://www.kdnuggets.com/2016/09/introducing-dask-parallel-programming.html**

## to be continue for further updates...!!!