<center>
    
# 2 - Introduction to `Dask`
    
<img src="imgs/dask_logo.png" alt="dask" width="250"/>

</center>

# 0) What is Dask?


[Dask](https://www.dask.org/) is a software framework that allows you to work with large datasets in parallel on your computer or a cluster of computers. 

- **Chunking**: Chunk data (Arrays, Dataframes, etc.) too big to fit into RAM by breaking it up into smaller chunks. 

- **Dynamic scheduling**: Divides the computational task and run it on different workers, according to their availability (*perform a task faster*).


<center>   
<img src="imgs/dask-overview.png" alt="dask-overview" width="900"/>
</center>

# 1) Create a `dask.array`

[Dask Array](https://docs.dask.org/en/stable/array.html) implements a subset of the NumPy ndarray interface using **blocked algorithms** and **chunking** the large arrays into many small arrays.

In [None]:
import dask
import dask.array as da

x = da.random.random((200_000, 1000), chunks=(50_000, 500))
x

In [None]:
x.visualize()

### **Dask is lazy by default!**

In [None]:
from sys import getsizeof
print(f"Size of `x`: {getsizeof(x)} bytes")
print(f"Size of `x.compute()`: {getsizeof(x.compute()) * 1e-9 : 0.2f} Gb")

# 2) Add layers to the computational task graph.

Chunking allows us perform computations that are larger than existing memory by using all of our machine cores.

The blocked algorithms are coordintated using [Dask Task Graphs](https://docs.dask.org/en/stable/graphs.html). 

Let's see an example:

In [None]:
x_sum = x.sum(1).sum(0)
x_sum

In [None]:
dask.visualize(x_sum)

##### Because Dask is lazy by default, we have to call the compute method to obtain the result!

In [None]:
x_sum.compute()

### Let's see the how Dask scales computation. 

In [None]:
import dask
import dask.array as da

# One chunk
x_short = da.random.random((5_000, 1000), chunks=(5_000, 1000)) 
# One chunk - 100 times larger
x_long = da.random.random((500_000, 1000), chunks=(500_000, 1000)) 
# 100 chunks - 100 times larger
x_long_chunks = da.random.random((500_000, 1000), chunks=(5_000, 1000)) 

%timeit x_short.sum().compute()
%timeit x_long.sum().compute()
%timeit x_long_chunks.sum().compute()

The `x_long` is array is 100 times larger than `x_short`. when there is no chunking, the computation takes ~100 times longer. However, if we chunk `x_long` into 100 chunks, the computation takes only ~20 times longer.

This is because Dask is multi-threading the computation of the different parts of the task graph, allowing to run them in ***parallel***.

# 3) Create a Dask Client

Starting a *Dask Client* is optional (there is a default scheduler). However, creating it will provide a **dashboard** which is useful to gain insight on the computation.

In [None]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)
client

##### Let's look at the Dask dashboard. Click on the printed link above and run the cell below:

In [None]:
x = da.random.random((10_000, 3_000, 100), chunks=(1_000, 1_000, 50))
y = da.random.random((10_000, 3_000, 100), chunks=(1_000, 1_000, 50))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))

%time z.sum(0).compute()

## Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.

This allows future computations to be faster.

In [None]:
x = da.random.random((10_000, 3_000, 100), chunks=(1_000, 1_000, 50))
y = da.random.random((10_000, 3_000, 100), chunks=(1_000, 1_000, 50))

# persist z
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2)).persist()

%time z.sum(0).compute()