# Dask

## Introduction
Pandas is a great tool and is used for a variety of purposes like data cleansing, exploratory data analysis, time series analysis, visual analysis, building features for ML models to name a few. We <3 pandas! However, we have seen that as soon as you hit scale, things start slowing down. People generally switch to Spark Data Frames. Porting pandas to spark DFs can be painful and might not be efficient until and unless you have super large datasets.

One of our data engineering piece started off small (100s of thousand data points / day) but quickly became a largish data problem (5M rows / day). That is when we decided to try out dask. Our chunked pandas dataframe techniques used to take 20 hrs to perform complex data cleansing on 300k rows vs 2.5 million rows in 60 mins on dask!

## What is Dask?
Dask provides a framework for performing parallel computing for analytics.

Dask is composed of two components:

* Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
* “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

## Broad Categories of Dask Components

* **Dask DataFrame** - mimics Pandas
* **Dask Bag** - mimics iterators, Toolz, and PySpark
* **Dask Delayed** - mimics for loops and wraps custom code

## Dask Dataframe

Dask dataframe is constituted of multiple pandas dataframes split along an index. The smaller pandas dataframes may reside in memory or on disk (if it does not fit in memory) or on multiple machines (in case of dask cluster)

![](./img/dask-dataframe.png)

Because the dask.dataframe application programming interface (API) is a subset of the Pandas API it should be familiar to Pandas users. There are some slight alterations due to the parallel nature of dask.

## Let's try it out

In [None]:
import dask.dataframe as dd
import pandas as pd
%matplotlib inline

### Dask DF

In [None]:
ddf = dd.read_csv('datasets/biketrip.csv')

In [None]:
type(ddf)

In [None]:
ddf.head()

### Pandas DF

In [None]:
pdf = pd.read_csv('datasets/biketrip.csv', )

In [None]:
type(pdf)

In [None]:
pdf.head()

In [None]:
pdf.shape

In [None]:
ddf.shape

In [None]:
%%timeit
pdf['start_date'].min()

In [None]:
%%timeit
ddf['start_date'].min()

In [None]:
%%timeit
pdf.groupby(['start_station_id'])['id'].count()

In [None]:
%%timeit
ddf.groupby(['start_station_id'])['id'].count()

In [None]:
%%timeit
ddf.groupby(['start_station_id'])['start_date'].min()

## Connect to SQL Table

Create SQL alchemy engine

In [None]:
from dask._version import get_versions
get_versions()

In [None]:
from sqlalchemy import create_engine
import sqlite3
import numpy as np

In [None]:
engine = create_engine('sqlite:///datasets/database.sqlite')
conn = sqlite3.connect('datasets/database.sqlite')
uri = 'sqlite:///datasets/database.sqlite'

In [None]:
meta = {
#     'id': np.int,
    'duration': np.int,
    'start_date': np.datetime64,
    'start_station_name': np.str,
    'start_station_id': np.int,
    'end_date': np.datetime64,
    'end_station_name': np.str,
    'end_station_id': np.int,
    'bike_id': np.int,
    'subscription_type': np.str,
    'zip_code': np.str    
}

In [None]:
data= {
#     'id': 0,
    'duration': 1,
    'start_date': '2014-01-01',
    'start_station_name': 'st anme',
    'start_station_id': 1,
    'end_date': '2014-02-20',
    'end_station_name': 'st name',
    'end_station_id': 3,
    'bike_id': 5,
    'subscription_type': 'subs',
    'zip_code': 'zip'  
}

In [None]:
cols = ['duration','start_date','start_station_name','start_station_id','end_date','end_station_name','end_station_id','bike_id','subscription_type','zip_code']

In [None]:
df = pd.DataFrame(columns=cols)

In [None]:
df.dtypes

In [None]:
for c in df.columns:
    df[c] = df[c].astype(meta[c])

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
ddf = dd.read_sql_table("trip", uri=uri, index_col='id', npartitions=4, meta=df)

In [None]:
ddf._meta

In [None]:
type(ddf)

In [None]:
ddf.size.compute()

In [None]:
%%time
ddf.groupby('start_station_name').min().compute()


In [None]:
ddf.groupby('start_station_name').min().visualize()

In [None]:
%%time
ddf.groupby('start_station_name')['start_date'].min().compute()

In [None]:
ddf = ddf.set_index('start_station_id')

In [None]:
%%time
ddf.groupby('start_station_id')['start_date'].min().compute()

## Exercises

Question 1: Find max duration for every station id

In [None]:
%%time
ddf.groupby(['start_station_name'])['duration'].max().compute()

Question 2: Set `start_station_id` as index

Question 3: Find the station with highest number of outbound in 2015

## Shuffling - Why index is important

In [None]:
%%time
ddf.groupby(['start_station_name'])['duration'].apply(lambda x: max(x)).compute()

In [None]:
%%time
ddf.groupby(['start_station_id'])['duration'].apply(lambda x: max(x)).compute()

In [None]:
ddf.groupby(['start_station_name'])['duration'].apply(lambda x: max(x)).visualize()

In [None]:
ddf.groupby(['start_station_id'])['duration'].apply(lambda x: max(x)).visualize()

First approach needs a lot of shuffling. Now imagine if you deployed the same code on a cluster. Shuffling here means network IO. This can slow down the computation very much

## Joins

Joins work just like pandas.

Joins are also quite fast when joining a Dask dataframe to a Pandas dataframe or when joining two Dask dataframes along their index. No special considerations need to be made when operating in these common cases.

So if you’re doing common groupby and join operations then you can stop reading this. Everything will scale nicely. Fortunately this is true most of the time.
```

>>> dask_df.join(pandas_df, on=column)                # Fast and common case
>>> lhs.join(rhs)                                     # Fast and common case
>>> lhs.merge(rhs, on=columns_with_index)             # Fast and common case

```

In some cases, such as when applying an arbitrary function to groups (when not grouping on index with known divisions), when joining along non-index columns, or when explicitly setting an unsorted column to be the index, we may need to trigger a full dataset shuffle

```
>>> lhs.join(rhs, on=columns_no_index)            # Requires shuffle
>>> df.set_index(column)                          # Requires shuffle
```

In [None]:
station_df = dd.read_sql_table("station", uri=uri, index_col='id', npartitions=1)

In [None]:
ddf.columns

In [None]:
ddf.join(station_df).visualize()

In [None]:
%%time
final = ddf.join(station_df).compute()

In [None]:
final.head()

In [None]:
%%time
final.groupby(['city'])['start_station_name'].count()

Question : Which city has the highest number of subscriber subscription type