# Introduction to Dask 

## Basics

Let's make some simple functions (increase and add ) that sleep for a while to simulate work.we will time run these functions normally. In nwxt section , we will parallelize this code

In [3]:
from time import sleep

In [4]:
#create simple functions

def inc(x):
    sleep(1)
    return x+1

def add(x,y):
    sleep(1)
    return(x+y)

In [5]:
%%time

x=inc(1)
y=inc(2)
z=add(x,y)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 3.01 s


## Parallelize the code using Dask.delayed decorator 

In this section , we will call inc and add wrapped with dask.delayed. This changes those functions so that they don't run immediately but instead put those functions and arguments into a task graph . If we run our code now , it will run immediately but in reality all it does is to create a graph. We then seperately compute the result by caling the .compute() method

In [6]:
#import delayed
from dask import delayed

In [8]:
%%time

#it runs immediately but all it does is create a graph 
x=delayed(inc)(1)
y=delayed(inc)(2)
z=delayed(add)(x,y)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 440 µs


In [10]:
#delayed value
z

Delayed('add-17a5b553-c811-4229-a2e8-0c1beffe7664')

In [9]:
%%time

#compute the results by actually running comutation using a local thread pool
z.compute()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 2.03 s


5

z is a lazy delayed object.This object holds everything that we need to compute the final  result. We can compute the final result by using .compute() and visualize the graph by using .visualize()

The reason compute take 2 seconds instead of 3 secs is because inc 1 and inc 2 are independent of each other and can run parallely

In [21]:
#import graphviz library to plot graphs
from graphviz import *

In [23]:
!brew install graphviz

/bin/sh: 1: brew: not found


In [24]:
#look at the task graph for z

z.visualize()

## Parallelize for a loop

In [25]:
data=[1,2,3,4,5,6,7,8,9,10]

In [26]:
%%time
#sequential code

results=[]
for x in data:
    y=inc(x)
    results.append(y)

final=sum(results)

CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 10 s


In [28]:
final

65

## Using Dask.delayed

In [29]:
%%time

#parallelizing code

results=[]
for x in data:
    y=delayed(inc)(x)
    results.append(y)
final=delayed(sum)(results)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.89 ms


In [30]:
final

Delayed('sum-c6f7ab00-4c6b-4bc2-8031-1e43997392f6')

In [31]:
%%time
final.compute

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 23.8 µs


<bound method DaskMethodsMixin.compute of Delayed('sum-c6f7ab00-4c6b-4bc2-8031-1e43997392f6')>

## Parallelizing a for loop with control flow

Often we want to delay only some functions,running few of them immediately. This is specialy helpful when those functions are fast nd help us determine what other slower functions we should call. This decision to delay or not to delay is usually where we need to be thoughtful when using dask.delayed



In the below example , we iterate through a list of inputs . If that input is even , then we want to call inc othterwise call double.

In [32]:
def double(x):
    sleep(1)
    return 2*x

def is_even(x):
    return not x%2
    

In [34]:
%%time

#sequential code

results=[]
for x in data:
    if is_even(x):
        y=double(x)
    else:
        y=inc(x)
    results.append(y)
final=sum(results)
        

CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 10 s


In [35]:
final

90

## Paralleizing using Dask .delayed( on some functions)

In [39]:
%%time

#parallelizing code

results=[]
for x in data:
    if is_even(x):
        y=delayed(double)(x)
    else:
        y=delayed(inc)(x)
    results.append(y)
final=delayed(sum)(results)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 2.04 ms


In [40]:
final

Delayed('sum-311adee8-204c-4708-9b15-349ff602703f')

In [41]:
print(final.compute())

90


## Parallelizing a pandas groupby reduction