# When your server is not enough: Scaling large compute tasks with dask (gateway)


This notebook introduces the basics of parallelization, focusing on how Dask can help scale your computations.
We will cover:
- What parallelization is
- Embarrassingly parallel tasks
- How Dask integrates with Xarray
- Diagnosing common issues when working with Dask



## Parallelization with Dask

When processing large datasets, a single machine can become a bottleneck. **Parallelization** helps by breaking down a task into smaller, independent tasks that can run simultaneously on multiple cores or nodes.

The ideal case is when tasks don't need to communicate with each other. This is referred to as **embarrassingly parallel**. For example, filtering independent chunks of a dataset is an embarrassingly parallel task. Dask is one of the tools that provides parallelization functionality within Python. 



### What is Dask?


Dask is a Python library for general purpose parallelism with a bunch of toolkits built on top. Probably you already have it installed on your computer without you knowing it. 

**First,...**

<img src="https://github.com/andersy005/xarray-tutorial/blob/main/images/should-i-use-dask.png?raw=true" width="50%">






## Dask: dynamic task scheduler

Dask represents distributed/parallel computations with task graphs, more specifically [directed acyclic graphs](https://en.wikipedia.org/wiki/Directed_acyclic_graph).

- A task is a function that you want to call and its corresponding inputs
- A task graph is a collection of (1) the functions we want to call + their inputs (2) their dependencies. 


Directed acyclic graphs are made up of nodes and have a clearly defined start and end, a single traversal path, and no looping 

<img src="https://raw.githubusercontent.com/andersy005/xarray-tutorial/ffb302785027b19447a75ea58990089e667f8ee7/images/dask-task-stream.gif">

At a very basic level, dask is a dynamic task scheduler. That means you give it a set of things you want to run and it decides where to run them on whatever real hardware you've given it. You can give it a laptop and a 100 functions/tasks you want to run and it will figure out how to run that set of tasks on the laptop. you can give it a cluster of 100 machines and it will hopefully run that set of tasks in much faster. 


### Start Dask Client

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(threads_per_worker=4, n_workers=1)
client = Client(cluster)
client

### Basics

First let's make some toy functions, `square`, `add`, and `square_root` that sleep for a while to simulate work. We'll then time running these functions normally.

In the next section we'll parallelize this code.

In [None]:
import time

import dask

In [None]:
def square(x):
    time.sleep(1)
    return x ** 2


def add(x, y):
    time.sleep(1)
    return x + y


def square_root(x):
    time.sleep(1)
    return x ** (1 / 2)

We time the execution of this normal code using the `%%time` magic, which is a special function of the Jupyter Notebook.

In [None]:
%%time

x = square(3)
y = square(4)
z = add(x, y)
r = square_root(z)
r

This takes `~4 seconds` to run because we call each function sequentially, one after the other.

Those two `square` calls *could* be called in parallel, because they are totally independent of one-another.

We'll transform the `square`, `add`, and `square_root` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.
Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.


In [None]:
%%time
delayed_square = dask.delayed(square)
delayed_add = dask.delayed(add)
delayed_square_root = dask.delayed(square_root)

x = delayed_square(3)
y = delayed_square(4)
z = delayed_add(x, y)
r = delayed_square_root(z)
r

**This ran immediately, since nothing has really happened yet.** 

To get the result, call `compute`. 

In [None]:
%%time

r.compute()



<div class="admonition alert alert-success">
    <p class="admonition-title" style="font-weight:bold"></p>
    Notice that this runs faster than the original code.
</div>

### What just happened?

The `r` object is a lazy `Delayed` object.  This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another.  We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`.

In [None]:
r.visualize(rankdir="LR")

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Reminder: Task and Task Graphs</p>
    <ul>
        <li> A task is a function that you want to call and its corresponding inputs. </li>
    <li> A task graph is a collection of (1) the functions we want to call + their inputs (2) their dependencies. </li>
    </ul>
</div>

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the `square` functions to the inputs of `add` and `square_root`.

### Some questions to consider:

-  Why did we go from 4s to 3s?  Why weren't we able to parallelize down to 2s?
-  What would have happened if the `square`, `add`, and `square_root` functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

### Submit many tasks 

In [None]:
results = []
for value in range(30):
    x = delayed_square(value)
    y = delayed_square(value)
    z = delayed_add(x, y)
    r = delayed_square_root(z)
    results.append(r)

In [None]:
%%time
result = dask.compute(results)

In [None]:
cluster.scale(4)

In [None]:
%%time
result = dask.compute(results)

## Dask Array


<img src="https://raw.githubusercontent.com/andersy005/xarray-tutorial/ffb302785027b19447a75ea58990089e667f8ee7/images/Dask Array (Light).png" width="50%" align="right">
Dask array provides a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

*  **Parallel**: Uses all of the cores on your computer
*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations



### Blocked Algorithms

A *blocked algorithm* executes on a large dataset by breaking it up into many small blocks/chunks.

For example, consider taking the sum of a billion numbers.  We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums.

We achieve the intended result (one sum on one billion numbers) by performing many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)

## `dask.array` contains these algorithms

`dask.array` implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using multiple cores. We coordinate these blocked algorithms using Dask graphs. Dask Array's are also lazy, meaning that they do not evaluate until you explicitly ask for a result using the compute method.

### Create `dask.array` object

If we want to create a 3D NumPy array of random values, we do it like this:

In [None]:
import dask.array as da
import numpy as np

In [None]:
shape = (600, 200, 200)
arr = np.random.random(shape)
arr

In [None]:
arr.nbytes / 1e9

Now let's create the same array using Dask's array interface.

In [None]:
darr = da.random.random(shape, chunks=(300, 100, 200))

A chunk size to tell us how to block up our array, like `(300, 100, 200)`. 

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Specifying Chunks</p>
    There are <a href="https://docs.dask.org/en/latest/array-chunks.html">several ways to specify chunks</a>. In this tutorial, we will use a block shape.


</div>



In [None]:
darr

Notice that we just see a symbolic representation of the array, including its `shape`, `dtype`, and `chunksize`. No data has been generated yet. Let's visualize the constructed task graph. 

In [None]:
darr.visualize()

Our array has four chunks. To generate it, Dask calls `np.random.random` four times and then concatenates this together into one array.

### Manipulate `dask.array` object as you would a numpy array


Now that we have an `Array` we perform standard numpy-style computations like arithmetic, mathematics, slicing, reductions, etc..

The interface is familiar, but the actual work is different. `dask_array.sum()` does not do the same thing as `numpy_array.sum()`.

#### What's the difference?

`dask_array.sum()` builds an expression of the computation. It does not do the computation yet. `numpy_array.sum()` computes the sum immediately.

#### Why the difference?

Dask arrays are split into chunks. Each chunk must have computations run on that chunk explicitly. If the desired answer comes from a small slice of the entire dataset, running the computation over all data would be wasteful of CPU and memory.

In [None]:
total = darr.sum()
total

In [None]:
total.visualize()

#### Compute result

Dask.array objects are lazily evaluated.  Operations like `.sum` build up a graph of blocked tasks to execute.  

We ask for the final result with a call to `.compute()`.  This triggers the actual computation.

In [None]:
%%time
total.compute()

## Dask and Xarray


One of xarray's most powerful features: the ability to wrap dask arrays and allow users to seamlessly execute analysis code in parallel.

- xarray DataArrays and Datasets are "dask collections" i.e. you can execute top-level dask functions such as `dask.visualize(xarray_object)`
- All xarray built-in operations can transparently use dask
- xarray provides tools to easily parallelize custom functions across blocks of dask-backed xarray objects.



## Setup

First let's set up a `LocalCluster` using `dask.distributed`. 



In [None]:
import xarray as xr

## Reading data with Dask and Xarray

Recall that a dask's array consists of many chunked arrays:

In [None]:
darr = da.ones((2000, 300), chunks=(200, 50))
darr

In [None]:
darr.compute()

To read data as dask arrays with xarray, we need to specify the `chunks` argument to `open_dataset()` function. 

In [None]:
ds = xr.open_dataset(
    "gs://cmip6/CMIP6/HighResMIP/MOHC/HadGEM3-GC31-HM/highresSST-present/r1i1p1f1/3hr/tas/gn/v20170831/", engine="zarr", chunks={}
)
ds

Passing `chunks={}` to `open_dataset()` works, but since we didn't tell dask how to split up (or chunk) the array, Dask will defer to the backend (`zarr`) to create chunks for our array. 

## Parallel and Lazy computation using `dask.array` with xarray


Xarray seamlessly wraps dask so all computation is deferred until explicitly requested. 

In [None]:
z = ds.tas.mean(['lat', 'lon']).dot(ds.tas.T)
z

As you can see, `z` contains a dask array. This is true for all xarray built-in operations including subsetting

In [None]:
z.isel(lat=0)

In [None]:
%%time
z.compute()

### Chunking and Making Dask Behave

Chunking is an important concept when dealing with large data. It refers to breaking your dataset into smaller, manageable pieces that can be processed in parallel. However, improper chunking can lead to memory overloads or inefficiencies.

Guidelines:

- If chunks are too large, they may not fit in memory.
- If chunks are too small, the overhead of managing too many tasks could become a bottleneck.

Finding the right balance is critical. Dask's diagnostic tools will help you monitor the memory and adjust chunk sizes as needed. 

## Transition to Distributed Systems

If your local machine can't handle the workload, it’s time to scale beyond a single server. This is where distributed computing comes into play.

In the next notebook, we'll discuss Dask Gateway, a tool to scale your computations across multiple machines effortlessly.

## Resources and references

* Reference
    *  [Docs](https://dask.org/)
    *  [Examples](https://examples.dask.org/)
    *  [Code](https://github.com/dask/dask/)
    *  [Blog](https://blog.dask.org/)
*  Ask for help
    *   [`dask`](http://stackoverflow.com/questions/tagged/dask) tag on Stack Overflow, for usage questions
    *   [github discussions](https://github.com/dask/dask/discussions) for general, non-bug, discussion, and usage questions
    *   [github issues](https://github.com/dask/dask/issues/new) for bug reports and feature requests

* Pieces of this notebook are adapted from the following sources
  * https://github.com/dask/dask-tutorial/blob/main/03_array.ipynb
  * https://github.com/xarray-contrib/xarray-tutorial/blob/master/scipy-tutorial/06_xarray_and_dask.ipynb
  
  