# Scaling IPython with Dask
<img src="fig/dask_log.png">

<H3>Dask is a flexible parallel computing library for analytic computing.</H3>

Dask is composed of two components:

* <b>Dynamic task scheduling optimized for computation.</b> This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
<BR><BR>
* <b>“Big Data” collections</b> like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.


### Dask emphasizes the following virtues:

* <b>Familiar</b>: Provides parallelized NumPy array and Pandas DataFrame objects
* <b>Flexible</b>: Provides a task scheduling interface for more custom workloads and integration with other projects.
* <b>Native</b>: Enables distributed computing in Pure Python with access to the PyData stack.
* <b>Fast</b>: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
* <b>Scales up</b>: Runs resiliently on clusters with 1000s of cores
* <b>Scales down</b>: Trivial to set up and run on a laptop in a single process
* <b>Responsive</b>: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

### Dask Framework
<img src="fig/collections-schedulers.png" width="80%" />
Figure by dask.pydata.org

# MapReduce mit Collections (Data Parallel Operations)

## Dask Array mimics NumPy
### First: example in NumPy

In [3]:
import numpy as np

A = np.random.rand(10,100000)
np.mean(A)

0.50008905357623556

### Now in Dask Arrays

In [4]:
import dask.array as da
B = da.from_array(A, chunks=(10, 10000))
B.mean().compute()

0.50008905357623545

In [10]:
import graphviz

a=B.visualize('dask2.svg')

<img src='dask2.svg' width=75%>

## More Examples

In [9]:
x = da.ones((15, 15), chunks=(5, 5))
a=x.visualize('dask3.svg')  

<img src='dask3.svg' width=75%>

In [11]:
a=(x + x.T).visualize('dask4.svg')

<img src='dask4.svg' width=75%>

In [12]:
# Now we just start showing off
a=(x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask5.svg')

<img src='dask5.svg' width=75%>

## Dask DataFrame mimics Pandas
### Example in Pandas

In [15]:
import pandas as pd

u_cols = ['user_id', 'age', 'sex', 'occupation', 'zip_code']
df = pd.read_csv('movielens100k/u.user', sep='|', names=u_cols)
#compute mean age over sex grouping
df.groupby(df.sex).mean()

Unnamed: 0_level_0,user_id,age
sex,Unnamed: 1_level_1,Unnamed: 2_level_1
F,481.406593,33.813187
M,468.167164,34.149254


### Now in Dask

In [17]:
import dask.dataframe as dd
ddf = dd.read_csv('movielens100k/u.user', sep='|', names=u_cols)
ddf.groupby(df.sex).mean().compute()

Unnamed: 0_level_0,age,occupation,sex,user_id,zip_code
sex,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
F,33.813187,,,481.406593,
M,34.149254,,,468.167164,


## Dask Delayed - generalized MapReduce



In [9]:
#Pseudo code!
def load(name):
    ...
    
def process(input):
    ...
    return result

from dask import delayed
L = []
for fn in filenames:                  # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()

SyntaxError: invalid syntax (<ipython-input-9-54431dfcf837>, line 3)

## Building a custom graphs

In [None]:
def load(filename):
    ...

def clean(data):
    ...

def analyze(sequence_of_data):
    ...

def store(result):
    with open(..., 'w') as f:
        f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store')  # executes in parallel

<img src="fig/pipeline.png" width=20%>

<H2>Back to our Target Application: Movie recommendation at Scale!</H2>
<img src="fig/movielens.png" width=50%>

<H2>First step: get Compute Cluster up and running</H2> 

In [3]:
from distributed import Client, LocalCluster
from dask.distributed import Executor, progress
#default: local multi-threaded cluster
LocalCluster()

LocalCluster("127.0.0.1:8786", workers=4, ncores=4)

In [4]:
e = Executor('127.0.0.1:8786', set_as_default=True)
e

<Client: scheduler="127.0.0.1:8786" processes=4 cores=4>

<H3>Reading the data in Parallel</H3>

In [5]:
import dask.dataframe as dd
import numpy as np
u_cols = ['user_id', 'age', 'sex', 'occupation', 'zip_code']
user = dd.read_csv('movielens100k/u.user', sep='|', names=u_cols)
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = dd.read_csv('movielens100k/u.data', sep='\t', names=r_cols)
m_cols=['movie_id', 'title', 'release date', 'video release date', 'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation', 'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']
movies = dd.read_csv('movielens100k/u.item', sep='|', names=m_cols)


In [6]:
user = e.persist(user)
ratings = e.persist(ratings)
movies = e.persist(movies)
progress(user,ratings,movies)

In [7]:
graph=ratings.rating.mean()
graph


dd.Scalar<series-..., dtype=float64>

In [59]:
graph.compute()

3.52986

<H3>Step 1: get ID from movie name</H3>

In [10]:
#get movie ID
name="Star" 
graph=movies.movie_id[movies.title.str.contains(name)]

candidates=np.array(graph.compute()) #compute and return as NumPy Array
movieID=candidates[0]-1
movieID

49

<H3>Step 2: get all users who rated that movie and gave a rating of 5</H3>

In [11]:
graph=ratings.user_id[(ratings.movie_id==movieID)&(ratings.rating>=4)]
userIDs=graph.compute()
userIDs=np.array(userIDs.sort_values())
userIDs

array([ 13,  43,  87,  89,  94, 130, 152, 174, 256, 283, 291, 299, 336,
       393, 416, 435, 450, 472, 477, 487, 500, 577, 642, 684, 711, 716,
       749, 798, 830, 854, 881, 886, 892])

<H3>Step 3: get all ratings >=4 of these users</H3> 

In [12]:
#Pandas: selectedRatings=ratings[(ratings.user_id.isin(userIDs)) & (ratings.rating >=4)]
graph=ratings[(ratings.user_id.isin(userIDs)) &(ratings.rating >=4)]
A=graph.visualize('dask6.svg')

<img src='dask6.svg' width=15%>

In [17]:
selectedRatings=graph.compute()
selectedRatings.head()

Unnamed: 0,user_id,movie_id,rating,timestamp
18,291,1042,4.0,874834944.0
22,299,144,4.0,877881320.0
56,87,384,4.0,879877127.0
84,291,144,5.0,874835091.0
92,87,1016,4.0,879876194.0


<H3>Step 4: group by movie id ...</H3>

I think you get the scheme by now ....

<H2>Dask in the cloud</H2>
<H3>dask-ec2 for AWS: https://github.com/dask/dask-ec2</H3>
<BR><BR>
dask-ec2 up --keyname my_aws_key --keypair ~/.ssh/my_aws_key.pem

<H2>Dask is just getting started ...</H2>
