# **Dask Distributed**

### So far we have ran all the code on a single machine - albeit with a decent number of cores

### How does this translate to a cluster? 

### The dask-distributed package provisions many ways to help run dask in a cluster. We'll look at how we can create a simple SSH Cluster.

In [None]:
import dask.array as da
import numpy as np
from dask.distributed import Client
client = Client()
# Alternatively, you can use dask-ssh CLI to connect 
# client = Client("tcp://206.189.136.196:8786")

def func(x):
    return np.tan(x) * np.arctan(x)

%time da.arange(10 ** 7).map_blocks(func, dtype=float).compute()

client.close()

### You can simple use dask-scheduler and dask-worker CLI commands to create a cluster and manage it

### Dask can be run on kubernetes as well and has decent integration with AWS, Azure & GCP

### Lets see if we can combine numba and dask together

In [None]:
from numba import jit, int32
from dask.distributed import Client, LocalCluster
from math import tan, atan
from dask import delayed, compute

class DistributedComputation(object):
    def __init__(self, n_workers=4):
        super().__init__()
        self.n_workers = n_workers
        self.cluster = LocalCluster(
            n_workers=n_workers, 
            processes=True, 
            threads_per_worker=1
        )

    def execute(self, func, *args, **kwargs):
        with Client(self.cluster) as client:
            return func(*args, **kwargs)

@delayed
@jit(int32(int32), nopython=True)
def fast_func(N):
    result = 0
    for i in range(N ** 7):
        result += tan(i) * atan(i)
    return result

if __name__ == "__main__":
    d = DistributedComputation(n_workers=16)
    fast_func(0) # first call to initiate JIT
    list_of_delayed_objs = (d.execute(fast_func, i) for i in range(1, 11))
    print(compute(*list_of_delayed_objs))

### Knowing which worker is doing what

In [None]:
import dask.array as da
from dask.distributed import Client
client = Client("tcp://192.168.0.105:8786") # spin up two workers and a scheduler in your terminal

In [None]:
from dask.distributed import get_worker, wait
import json

x = da.arange(100, chunks=50)

def do_something(x):
    worker = get_worker()
    print(f"{worker.id} is the worker id")
    print(list(worker.data.values()))
    return x

x.map_blocks(do_something, dtype=int).compute()

# **Exercise**

Try creating a scheduler locally and use the IP address to create dask workers. Try scaling up manually and see how it affects performance