Dask
====

Dask is a Python library for doing computataions with big data. This notebook will let you try some of the basics. 

### Getting set up
First off, let's import some Python libraries and install some dependencies

In [None]:
!pip install castra graphviz # missing dependency for 
!apt-get install -y graphviz # logic graph viz

# just so we do plots in the notebook
%matplotlib inline
import dask # for parallel computing
from distributed import Executor, progress # for distributed parallel computing

The first thing to do is grab the Dask executor - the thing we'll use for submitting jobs to our cluster.

In [None]:
e = Executor('dask.informaticslab.co.uk:8786')
print(e)

You should see now how many nodes are available in the executor. Ask Niall to show you how we scale this up and down using AWS.

### Execute a function on the nodes

We're going to write a simple function which we can exectue on the nodes. The code in the cell below just gets the IP of the computer its run on

In [None]:
import socket
getIP = lambda: "Hello world from IP " + socket.gethostbyname(socket.gethostname())

getIP()

The cell above was just run on the same computer that this Notebook is running on. Let's submit it to the compute cluster using our Executor `e`.

You should open [this window](http://ec2-52-212-33-212.eu-west-1.compute.amazonaws.com:8787/) and click on `Status` where you can watch all the jobs (yours and others) being submitted to the comput node.

In [None]:
e.run(getIP)

### Delayed functions
Have a look at the cell below - it does various basic arithemitc opertions.

In [None]:
def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

def process_list(xs):
    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)
    return output

def mysum(xs):
    total = 0
    for x in xs:
        total += x

    return total

In [None]:
double(2)

In [None]:
inc(double(2))

In [None]:
data = [1, 2, 3, 4, 5]

result = mysum(process_list(data))

In [None]:
print(result)

So far so normal. No for the magic of the wonderful `dask.delayed` decorator.

If we decorate the same functions, they become `delayed` functions. This means the are not yet executed, but their sitting waiting to be mapped onto our Exector `e`.

In [None]:
@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

def process_list(xs):
    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)
    return dask.delayed(output)

@dask.delayed
def mysum(xs):
    total = 0
    for x in xs:
        total += x

    return total

In [None]:
result = double(2)

In [None]:
print(result)

In [None]:
print(result.compute())

In [None]:
data = [1, 2, 3, 4, 5]

result = mysum(dask.delayed(process_list(data)))

In [None]:
result

In [None]:
result.visualize()

In [None]:
result.compute()

On more data, the logic graph gets automatically optimised

In [None]:
data = range(300)

result = mysum(dask.delayed(process_list(data)))
result.visualize() #be patient!