<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# DASK : a flexible library for parallel computing in Python.

### 1. Dask supports the Pandas dataframe and Numpy array data structures
### 2. Dask cab be run on your local computer or be scaled up to run on a cluster (code to be written just once)
### 3. With minimal code changes, you can run code in parallel taking advantage of the processing power already on your laptop


If you love Pandas and Numpy but were sometimes struggling with data that would not fit into RAM then Dask is definitely what you need

## INSTALLATION
### CONDA 
Usually dask gets installed with your Anaconda. You can update Dask using the conda command:

In [None]:
!conda install dask #This installs Dask and all common dependencies, including Pandas and NumPy.

### PIP

You can install everything required for most common uses of Dask (arrays, dataframes, …) This installs both Dask and dependencies like NumPy, Pandas, and so on that are necessary for different workloads. This is often the right choice for Dask users:

In [None]:
!pip install "dask[complete]"    # Installs everything

# Parallelize code with `dask.delayed`

In this section we parallelize simple for-loop style code with Dask and `dask.delayed`.

This is a simple way to use `dask` to parallelize existing codebases or build [complex systems](http://matthewrocklin.com/blog/work/2018/02/09/credit-models-with-dask).  This will also help us to develop an understanding for later sections.

In [1]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y


In [2]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

CPU times: user 0 ns, sys: 2.8 ms, total: 2.8 ms
Wall time: 3 s


In [3]:
z

5

### Parallelize with the `dask.delayed` decorator

Those two increment calls *could* be called in parallel.

We'll wrap the `inc` and `add` functions in the `dask.delayed` decorator. When we call the delayed version by passing the arguments, the original function isn't actually called yet.
Instead, a *task graph* is built up, representing the *delayed* function call.

In [4]:
from dask import delayed

In [5]:
%%time
# This runs immediately, all it does is build a graph

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

CPU times: user 982 µs, sys: 791 µs, total: 1.77 ms
Wall time: 4.63 ms


In [6]:
z

Delayed('add-d81d2826-ecfc-4ce8-8c9c-920e6969379a')

This ran immediately, since nothing has really happened yet.

To get the result, call `compute`.

In [7]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

CPU times: user 9.18 ms, sys: 7.58 ms, total: 16.8 ms
Wall time: 2.02 s


5

## What just happened?

The `z` object is a lazy `Delayed` object.  This object holds everything we need to compute the final result.  We can compute the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`.

In [None]:
#!conda install python-graphviz
import graphviz

In [None]:
# Look at the task graph for `z`
z.visualize()

## Exercise: Parallelizing a for-loop code with control flow

Often we want to delay only *some* functions, running a few of them immediately.  This is especially helpful when those functions are fast and help us to determine what other slower functions we should call.  This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`.

In the example below we iterate through a list of inputs.  If that input is even then we want to call `inc`.  If the input is odd then we want to call `double`.  This `is_even` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

In [None]:
def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
print(total)

In [None]:
%%time
# Your parallel code here...
# TODO: parallelize the sequential code above using dask.delayed
# You will need to delay some functions, but not all

results = []
for x in data:
    if is_even(x):  # even
        y = delayed(double)(x)
    else:          # odd
        y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

In [None]:
total

In [None]:
total.compute()

In [None]:
%time total.compute()

In [None]:
#total.visualize()

## Parallelizing a Pandas Groupby Reduction

In this exercise we read several CSV files and perform a groupby operation in parallel.  We are given sequential code to do this and parallelize it with `dask.delayed`.

The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data.  We will do this by using `dask.delayed` together with `pandas`.  In a future section we will do this same exercise with `dask.dataframe`.


In [1]:
pwd

'/opt/data/export/DataScience_Notebooks/Dask'

In [3]:
import os
sorted(os.listdir('Data'))

['.ipynb_checkpoints',
 '1990.csv',
 '1991.csv',
 '1992.csv',
 '1993.csv',
 '1994.csv',
 '1995.csv',
 '1996.csv',
 '1997.csv',
 '1998.csv',
 '1999.csv']

In [None]:
import pandas as pd
df = pd.read_csv(os.path.join('C:\\Users\\sharvik\\data', 'nycflights', '1990.csv'))
df.head()

In [None]:
# What is the schema?
df.dtypes

## Sequential code: Mean Departure Delay Per Airport
The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.

In [None]:
from glob import glob
filenames = sorted(glob(os.path.join('C:\\Users\\sharvik\\data', 'nycflights', '*.csv')))

In [None]:
%%time

sums = []
counts = []
for fn in filenames:
    # Read in file
    df = pd.read_csv(fn)
    
    # Groupby origin airport
    by_origin = df.groupby('Origin')
    
    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()
    
    # Number of flights by origin
    count = by_origin.DepDelay.count()
    
    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
print(mean)

In [None]:
from dask import compute

Parallelize the code above
Use dask.delayed to parallelize the code above.

In [None]:
%%time

sums = []
counts = []
for fn in filenames:
    # Read in file
    df = delayed(pd.read_csv)(fn)
    
    # Groupby origin airport
    by_origin = df.groupby('Origin')
    
    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()
    
    # Number of flights by origin
    count = by_origin.DepDelay.count()
    
    # Save the intermediates
    sums.append(total)
    counts.append(count)

### Delayed Objects
sout = sums
cout = counts

sums, counts = compute(sums, counts)
# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
sout

In [None]:
#cout[0].visualize()