# Dasknabbit

Dask is a library to do parallel stuff

* Tools to create task graphs
* Schedulers/workers/threads to run task graphs
* Data collections

## `delayed`

### The World's worst functions ...

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

print("z is", z)

### Parallelize with `dask.delayed`...

In [None]:
from dask import delayed

In [None]:
%%time
# This runs immediately, all it does is build a graph

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

In [None]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

In [None]:
# Look at the task graph for `z`
z.visualize()

In [None]:
# How about a for loop on a list?

data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)
print("total is", total)

In [None]:
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

# Let's see what type of thing total is
print("Printing total: ", total)

In [None]:
%%time

# Compuing ...
result = total.compute()
print("Printing result from computing total:", result)

In [None]:
total.visualize()

## Schedulers

Something has to execute these task graphs!

Two families of schedulers:
* Single machine
* Distributed

In [None]:
# Single thread ...

%time result = total.compute(scheduler='synchronous')

In [None]:
# Local threads
# Uses multiprocessing.pool.ThreadPool

# Use all the processors
%time result = total.compute(scheduler='threads')

# Or only some
%time result = total.compute(scheduler='threads', num_workers=2)

In [None]:
# Local processes
# Uses multiprocessing.Pool

# User all the processors
%time result = total.compute(scheduler='processes')

# Or only some
%time result = total.compute(scheduler='processes', num_workers=2)

In [None]:
from dask.distributed import Client
client = Client()
client

In [None]:
%time result = total.compute()

In [None]:
from random import randrange

def inc(x):
    sleep(randrange(8,15))
    return x + 1

In [None]:
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

In [None]:
%time result = total.compute()

In [None]:
client.close()

In [None]:
client = Client(processes=False)
client

## Data collections

* Dask Dataframe
* Dask Array
* Dask Bag

## Demo