<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" align="right" width="30%" alt="Dask logo">


<img src="images/hpc_cluster.jpg" alt="Dask Overview"  width="600" />


# Dask Tutorial

Dask is a parallel and distributed computing library that scales the existing Python and PyData ecosystem.

Dask can scale up to the full capacity of a single computer and also scale out to a cloud cluster.


## A Dask Calculation Example

In the following lines of code, we read the 2015 New York City taxi data and find the average tip amount. Don't worry about the code; this is just for a quick demonstration.


In [9]:
#!pip install "dask[complete]"
#!pip install "bokeh!=3.0.*,>=2.4.2"
#!pip install s3fs
#!pip install zarr
#!pip install ipycytoscape 

In [1]:
import dask.dataframe as dd
from dask.distributed import Client

In [2]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 18.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49252,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 18.00 GiB

0,1
Comm: tcp://127.0.0.1:49264,Total threads: 3
Dashboard: http://127.0.0.1:49268/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:49255,
Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-14n7uck1,Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-14n7uck1

0,1
Comm: tcp://127.0.0.1:49265,Total threads: 3
Dashboard: http://127.0.0.1:49270/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:49257,
Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-_ageymli,Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-_ageymli

0,1
Comm: tcp://127.0.0.1:49266,Total threads: 3
Dashboard: http://127.0.0.1:49272/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:49259,
Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-412ufd30,Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-412ufd30

0,1
Comm: tcp://127.0.0.1:49267,Total threads: 3
Dashboard: http://127.0.0.1:49274/status,Memory: 4.50 GiB
Nanny: tcp://127.0.0.1:49261,
Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-2myl4fn5,Local directory: /var/folders/bp/mlqhryhx7s5186t3z_y93lnr0000gn/T/dask-scratch-space/worker-2myl4fn5


## What is [Dask]("https://www.dask.org/")?

There are many parts to the "Dask" project:
* Collections/API, also known as the "core library."
* Distributed: for creating clusters.
* Integrations and a broader ecosystem.

### Dask is a flexible library for parallel computing in Python

- Dynamic task scheduling optimized for computation.
- _“Big Data” collections_ like _parallel arrays_, dataframes, and lists (similar to NumPy, Pandas, etc.).


### Dask Collections

<img src="images/main-components.png" alt="Dask Overview"  width="600" />

Dask provides **multi-core** and **distributed+parallel** execution on datasets **larger than memory**.

We can think of Dask APIs (also called collections) at a high and low level:

<center>
<img src="images/high_vs_low_level_coll_analogy.png" width="75%" alt="High vs Low level clothes analogy">
</center>

* **High-level Collections:** Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and pandas but can operate in parallel on datasets that don’t fit in memory.
* **Low-level Collections:** Dask also provides low-level Delayed and Futures collections that give you more precise control to build custom parallel and distributed computations.


### Dask Cluster

Most of the time, when you use Dask, you will be using a distributed architecture that exists within the context of a Dask cluster. The Dask cluster is structured as follows:

<center>
<img src="images/distributed-overview.png" width="75%" alt="Distributed overview">
</center>


### Dask Ecosystem

In addition to the core Dask library and its distributed scheduler, the Dask ecosystem connects several additional initiatives, including:

- Dask-ML (scikit-learn-style parallel API)
- Dask-image
- Dask-cuDF
- Dask-sql
- Dask-snowflake
- Dask-mongo
- Dask-bigquery

Community libraries that have built-in Dask integrations, such as:

- array
- XGBoost
- prefect
- Airflow

Dask deployment libraries:
- Dask-kubernetes
- Dask-YARN
- Dask-gateway
- Dask-cloud provider
- jobqueue

... When we talk about the Dask project, we include all these efforts as part of the community.


## Content Structure We Will Cover in Dask

Each section is a Jupyter notebook. There is a mix of text, code, and exercises.

0. [Overview](00_overview.ipynb) - Dask's place in the universe.

1. [Array](02_array.ipynb): Numpy-like functionality with a collection of numpy arrays distributed across your cluster.

2. [Dataframe](01_dataframe.ipynb): Parallel operations on many pandas dataframes distributed across your cluster.

3. [Delayed](03_dask.delayed.ipynb): The way to parallelize general Python code with a single function.

4. [Deployment/Distributed](04_distributed.ipynb): Dask's scheduler for clusters, with details on how to view the UI.

5. [Distributed Futures](05_futures.ipynb): Non-blocking results computed asynchronously.

6. Conclusion


In [5]:
client.shutdown()

2024-08-22 14:13:50,693 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/Users/humbertofariasaroca/miniforge3/envs/data_analysis/lib/python3.12/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/humbertofariasaroca/miniforge3/envs/data_analysis/lib/python3.12/site-packages/distributed/worker.py", line 1250, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/humbertofariasaroca/miniforge3/envs/data_analysis/lib/python3.12/site-packages/distributed/utils_comm.py", line 459, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/