# Introduction

## Install

    pip install "dask[distributed]"
    pip install "bokeh>=3.1.0" (for the Dask dashboard)


## TLDR
Dask is an open-source Python library for parallel and distributed computing. It allows you to scale Python code from a single machine to a cluster while keeping a familiar NumPy- and Pandas-like interface.

Dask lets you write familiar Python code that runs in parallel and scales to datasets much larger than memory—without rewriting everything.


## Why the name *Dask*?

“Dask” is short for Dynamic Task Scheduling.

That name describes exactly what the library does.

Dynamic:
- Tasks are created and scheduled at runtime, not fixed in advance.
- Dask can adapt to data size, available resources, and failures.

Task
- All computations are broken down into many small tasks (e.g. “process this partition”, “sum this chunk”).

Scheduling
A scheduler decides:
- when a task runs
- where it runs (thread, process, or another machine)
- in what order, based on dependencies


## What problem does Dask solve?

Standard Python tools like NumPy and Pandas are fast but:
- run mostly on a single core, and
- require data to fit into memory.

Dask overcomes this by:
- breaking computations into small tasks,
- executing them in parallel, and
- streaming or distributing data so it can work with datasets larger than RAM.

## Core components

**Dask Collections (high-level APIs)** -
These mirror common Python libraries:
- Dask Array → like NumPy, but chunked and parallel
- Dask DataFrame → like Pandas, but partitioned across cores/nodes
- Dask Bag → for semi-structured data (JSON, logs, text)

**Task Graph** -
Behind the scenes, Dask builds a task graph (a DAG):
- each node = a small computation
- edges = dependencies
- this graph is optimized and then executed in parallel.

**Scheduler** -
Dask has multiple schedulers:
- Single-machine (threads/processes) – easy, no setup
- Distributed scheduler – works across machines, with: dynamic load balancing, fault tolerance, real-time dashboard


## When should you use Dask?

Use Dask when:
- your data doesn’t fit in memory
- your computation is CPU-bound or parallelizable
- you want minimal code changes from NumPy/Pandas
- you need to scale gradually (laptop → cluster)

Don’t use Dask if:
- data is small and fits comfortably in memory
- performance is already good with Pandas/NumPy
- your workload is heavily sequential

## How Dask compares

- Pandas: Small–medium data, simple workflows
- Dask: Medium–very large data, parallel Python
- Spark: Large clusters, JVM ecosystem, SQL-heavy


## Key takeaway

- Dask code looks like NumPy / Pandas
- Computation is lazy and parallel
- Call .compute() to execute
- Scale from single machine to cluster with minimal changes

# How Dask works with BigData

Dask turns big problems into many small problems, solves them in parallel, and then combines the results.

E.g. Dask code:

    df = dd.read_csv("large.csv")
    result = df[df.sales > 100].groupby("category").sales.mean()

Behind the scenes:

    read part-1.csv → filter → partial mean
    read part-2.csv → filter → partial mean
    read part-3.csv → filter → partial mean
                         ↓
                  combine partial means

Each arrow is a task.
Each “part-x” is a partition.

- Dask splits large datasets into many small pieces called partitions (or chunks).
- Each partition is small enough to fit into memory and can be processed independently.
- When you write Dask code, it does not run immediately; execution is lazy.
-Instead, Dask records operations and builds a task graph describing the computation. 
- Each node in the graph is a small task, such as filtering a partition or summing a chunk.
- Dependencies between tasks define the correct execution order.
- Before running, Dask optimizes the task graph to reduce work and data movement.
- When .compute() is called, the graph is sent to a scheduler.
- The scheduler runs tasks in parallel on threads, processes, or distributed workers.
- Partial results are combined into the final result, allowing Dask to scale to very large data.

# Dask in a computer cluster

- When you use Dask’s distributed scheduler, Dask can execute tasks on many machines (workers) in a cluster.
- Each task is a small unit of work
- Each worker is a Python process, possibly on a different computer
- The scheduler decides where and when each task runs

    
        Your code
           ↓
        Task graph
           ↓
        Scheduler
           ↓
        Worker A (PC 1)   Worker B (PC 2)   Worker C (PC 3)

# Generate some large files

In [1]:
# generate_demo_data.py
import os
import numpy as np
import pandas as pd

def make_large_csv(path="large_data.csv", n_rows=2_000_000, seed=42):
    rng = np.random.default_rng(seed)

    categories = np.array(["A", "B", "C", "D", "E"])
    df = pd.DataFrame({
        "category": rng.choice(categories, size=n_rows),
        "sales": rng.normal(loc=100.0, scale=20.0, size=n_rows).clip(min=0),
        "revenue": rng.normal(loc=1000.0, scale=250.0, size=n_rows).clip(min=0),
        "user_id": rng.integers(1, 200_000, size=n_rows),
    })
    df.to_csv(path, index=False)
    print(f"Wrote {path} with {n_rows:,} rows")

def make_monthly_files(folder="data", year=2024, rows_per_file=300_000, seed=123):
    os.makedirs(folder, exist_ok=True)
    rng = np.random.default_rng(seed)

    categories = np.array(["A", "B", "C", "D", "E"])
    for month in range(1, 13):
        df = pd.DataFrame({
            "date": pd.date_range(f"{year}-{month:02d}-01", periods=rows_per_file, freq="min")[:rows_per_file],
            "category": rng.choice(categories, size=rows_per_file),
            "revenue": rng.normal(loc=1200.0, scale=300.0, size=rows_per_file).clip(min=0),
            "cost": rng.normal(loc=800.0, scale=200.0, size=rows_per_file).clip(min=0),
        })
        out = os.path.join(folder, f"{year}-{month:02d}.csv")
        df.to_csv(out, index=False)
        print(f"Wrote {out} with {rows_per_file:,} rows")

make_large_csv()
make_monthly_files()

Wrote large_data.csv with 2,000,000 rows
Wrote data/2024-01.csv with 300,000 rows
Wrote data/2024-02.csv with 300,000 rows
Wrote data/2024-03.csv with 300,000 rows
Wrote data/2024-04.csv with 300,000 rows
Wrote data/2024-05.csv with 300,000 rows
Wrote data/2024-06.csv with 300,000 rows
Wrote data/2024-07.csv with 300,000 rows
Wrote data/2024-08.csv with 300,000 rows
Wrote data/2024-09.csv with 300,000 rows
Wrote data/2024-10.csv with 300,000 rows
Wrote data/2024-11.csv with 300,000 rows
Wrote data/2024-12.csv with 300,000 rows


In [2]:
!ls -la

total 88172
drwxrwxr-x  4 juebrauer juebrauer     4096 Jan 20 17:40 .
drwxrwxr-x 20 juebrauer juebrauer     4096 Jan 20 16:43 ..
-rw-rw-r--  1 juebrauer juebrauer    36170 Jan 20 17:40 dask.ipynb
drwxrwxr-x  2 juebrauer juebrauer     4096 Jan 20 16:58 data
drwxrwxr-x  2 juebrauer juebrauer     4096 Jan 20 17:36 .ipynb_checkpoints
-rw-rw-r--  1 juebrauer juebrauer 90231937 Jan 21 07:19 large_data.csv


In [3]:
!ls -la data

total 206008
drwxrwxr-x 2 juebrauer juebrauer     4096 Jan 20 16:58 .
drwxrwxr-x 4 juebrauer juebrauer     4096 Jan 20 17:40 ..
-rw-rw-r-- 1 juebrauer juebrauer 17575683 Jan 21 07:19 2024-01.csv
-rw-rw-r-- 1 juebrauer juebrauer 17575841 Jan 21 07:19 2024-02.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576025 Jan 21 07:19 2024-03.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576625 Jan 21 07:19 2024-04.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576287 Jan 21 07:19 2024-05.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576158 Jan 21 07:19 2024-06.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576533 Jan 21 07:19 2024-07.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576098 Jan 21 07:19 2024-08.csv
-rw-rw-r-- 1 juebrauer juebrauer 17575874 Jan 21 07:19 2024-09.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576138 Jan 21 07:19 2024-10.csv
-rw-rw-r-- 1 juebrauer juebrauer 17576358 Jan 21 07:19 2024-11.csv
-rw-rw-r-- 1 juebrauer juebrauer 17575900 Jan 21 07:19 2024-12.csv


# Example: Compute means per group (Pandas like)

In [4]:
import dask.dataframe as dd

ddf = dd.read_csv("large_data.csv")

In [5]:
type(ddf)

dask.dataframe.dask_expr._collection.DataFrame

In [6]:
ddf

Unnamed: 0_level_0,category,sales,revenue,user_id
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,string,float64,float64,int64
,...,...,...,...


In [7]:
ddf.info()

<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 4 entries, category to user_id
dtypes: float64(2), int64(1), string(1)

In [8]:
ddf.head()

Unnamed: 0,category,sales,revenue,user_id
0,A,113.888029,1516.065382,180231
1,D,110.208432,1166.372605,80585
2,D,105.195603,888.623738,110454
3,C,78.117677,752.056962,167720
4,C,49.146001,1268.38856,9592


In [9]:
# lazy execution
result = ddf.groupby("category")["sales"].mean()

In [10]:
print(result.compute())

category
A     99.994346
B    100.031802
C    100.048502
D    100.011763
E    100.018166
Name: sales, dtype: float64


# Example: Filter + derived column + top rows

In [11]:
import dask.dataframe as dd

df = dd.read_csv("large_data.csv")

In [12]:
# Add a derived column
df = df.assign(profit=df["revenue"] - df["sales"])

In [13]:
df

Unnamed: 0_level_0,category,sales,revenue,user_id,profit
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,string,float64,float64,int64,float64
,...,...,...,...,...


In [14]:
# Filter
filtered = df[df["profit"] > 900]

In [15]:
# Show a small sample (head triggers compute for just enough partitions)
print(filtered.head(10))

   category       sales      revenue  user_id       profit
0         A  113.888029  1516.065382   180231  1402.177353
1         D  110.208432  1166.372605    80585  1056.164173
4         C   49.146001  1268.388560     9592  1219.242558
7         D  100.019221  1154.208129    66076  1054.188908
9         A  124.359395  1236.640113   141384  1112.280718
13        D  121.989976  1258.010785      587  1136.020809
15        D   66.755880  1093.983334    90353  1027.227454
16        C   67.756341  1146.528321    96976  1078.771979
18        E   86.411311  1197.737561    12779  1111.326250
19        C   91.737983  1077.364867    45311   985.626884


# Example: Parallel NumPy-like compute with Dask Array (no files)

In [16]:
import dask.array as da

# Large array split into chunks
x = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

# Do some work
value = (x ** 2).mean()

print(value.compute())

0.33333336654740475


In [17]:
type(x)

dask.array.core.Array

In [18]:
type(value)

dask.array.core.Array

In [19]:
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [20]:
small = x[:5, :5]
small

Unnamed: 0,Array,Chunk
Bytes,200 B,200 B
Shape,"(5, 5)","(5, 5)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 200 B 200 B Shape (5, 5) (5, 5) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",5  5,

Unnamed: 0,Array,Chunk
Bytes,200 B,200 B
Shape,"(5, 5)","(5, 5)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [21]:
small.compute()

array([[0.12947225, 0.88080255, 0.28970552, 0.3592478 , 0.25928886],
       [0.47128115, 0.69964065, 0.86889862, 0.13914909, 0.72349453],
       [0.98415335, 0.68843327, 0.0789159 , 0.65768556, 0.96961143],
       [0.82064552, 0.16124114, 0.85457673, 0.33419485, 0.48554417],
       [0.12389787, 0.3507031 , 0.75869325, 0.39890865, 0.07945272]])

# Example: Process many files in parallel (wildcard read)

In [22]:
import dask.dataframe as dd

df = dd.read_csv("data/2024-*.csv", parse_dates=["date"])

In [23]:
# Total revenue across all months
total_revenue = df["revenue"].sum()

In [24]:
# Revenue by category
by_category = df.groupby("category")["revenue"].sum()

In [25]:
print("TOTAL REVENUE:", total_revenue.compute())

TOTAL REVENUE: 4319768249.109682


In [26]:
print("\nREVENUE BY CATEGORY:\n", by_category.compute())


REVENUE BY CATEGORY:
 category
A    8.645690e+08
B    8.628133e+08
C    8.655972e+08
D    8.636500e+08
E    8.631387e+08
Name: revenue, dtype: float64


# Example: Distributed execution + dashboard (local cluster)

In [27]:
from dask.distributed import Client
import dask.dataframe as dd

client = Client()  # starts a local scheduler + workers
print(client)      # shows dashboard link in many environments

df = dd.read_csv("large_data.csv")

# Something slightly heavier than a simple mean
result = df.groupby("user_id")["sales"].sum().nlargest(10)

print(result.compute())

<Client: 'tcp://127.0.0.1:35371' processes=8 threads=32, memory=31.03 GiB>
user_id
46525     2790.655896
38685     2776.205139
53413     2693.377072
30938     2632.352881
164182    2614.349082
114495    2590.091976
6962      2570.150015
198900    2563.553513
166146    2561.049610
81825     2559.771817
Name: sales, dtype: float64


The dashboard can be found at: [http://localhost:8787](http://localhost:8787)
