# Lesson 5: Distributed Scaling

In lesson 4 you've learned how to be as efficient as possible on a single CPU.

Now we want to use multiple CPUs at the same time (horizontal scaling) and optimize efficiency of multiple CPUs.

![image](https://raw.githubusercontent.com/nsmith-/2025-09-15-hsf-india-tutorial-chandigarh/cda03896f0f7d4d195db3fedf00a8754f6774214/img/horizontal-and-vertical-scaling.svg)

## Scaling on a single machine

A single computer may have multiple cores available that we can use to concurrently run software.

### In python the two ways to parallelize programs are: **_multi-processing_** and **_multi-threading_**.

**Multi-processing** is essentially about running multiple OS processes in parallel, each with its own Python interpreter, memory space, etc.

**Multi-threading**, however, uses a shared memory space and a common Python interpreter to spawn multiple threads that can run tasks concurrently. 
_Caution_: In Python multi-threading is in fact not running programs in parallel because of the Global Interpreter Lock (GIL), only since python 3.14 (free-threaded) this is possible.


### Basic rule of thumb for Python:

**Use multi-processing** when you're program is CPU-bound (i.e. it's constantly calculating things).

**Use multi-threading** when you're program is IO-bound (i.e. it's waiting a long time for loading data or network requests).

In [1]:
import concurrent.futures

In [2]:
%%writefile quadratic_formula_scaling.py
# writing to a separate file, because multiprocessing is buggy in notebooks (see: https://bugs.python.org/issue25053)
import numpy as np
import time

a = np.random.uniform(5, 10, 1_000_000)
b = np.random.uniform(10, 20, 1_000_000)
c = np.random.uniform(-0.1, 0.1, 1_000_000)

def quadratic_formula(task_id):
    time.sleep(task_id / 5.0)  # simulate some task_id-dependent workload
    return (-b + np.sqrt(b**2 - 4*a*c)), task_id  # return task_id for identification


# setup 10 tasks
task_ids = range(10)

Overwriting quadratic_formula_scaling.py


### Python for-loop example

In [3]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids

for task_id in task_ids:
    output, _ = quadratic_formula(task_id)
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 0 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 1 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 2 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 3 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 4 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 5 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 6 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 7 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 8 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 9 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
9.21 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Multi-threading example

In [4]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids


with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(quadratic_formula, task_id) for task_id in task_ids]

for future in concurrent.futures.as_completed(futures):
    output, task_id = future.result()
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 9 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 1 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 6 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 3 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 4 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 2 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 0 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 5 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 8 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
Task 7 completed with output [-0.08082671  0.09911432  0.08972523 -0.01775973  0.07344676]...
1.82 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Multi-processing example

In [5]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(quadratic_formula, task_id) for task_id in task_ids]

for future in concurrent.futures.as_completed(futures):
    output, task_id = future.result()
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 8 completed with output [-0.01903211  0.08886943  0.06023321 -0.01361315  0.01980665]...
Task 4 completed with output [ 0.06633378 -0.06323433 -0.03275999 -0.04053329  0.03982774]...
Task 1 completed with output [-0.11042231  0.00825159 -0.06311855 -0.10245743  0.0657424 ]...
Task 6 completed with output [ 0.05207876 -0.00979181 -0.03736915 -0.06646686 -0.08524444]...
Task 3 completed with output [ 0.02181904 -0.05276605  0.04963738  0.06528493  0.01999756]...
Task 0 completed with output [-0.0603393  -0.09348962 -0.02696014 -0.02271398 -0.04525356]...
Task 5 completed with output [ 0.07204897 -0.06316745 -0.04246816 -0.03426    -0.08783372]...
Task 2 completed with output [ 0.06893495 -0.10597019  0.04039311  0.00936472  0.0248067 ]...
Task 7 completed with output [-0.04496032 -0.00609686 -0.08873753  0.05854654 -0.05904883]...
Task 9 completed with output [-0.04795121 -0.10468754  0.06472954  0.00318819  0.09628321]...
2.07 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

Multi-threading and multi-processing can speed-up your programs by concurrently executing multiple tasks across multiple threads or processes.

Here, we can see in addition the effect of shared (**multi-threading**) vs individual (**multi-processing**) memory space: 
- **multi-threading**: the NumPy random initialization runs _once_ and the memory is shared between all threads, which is why all outputs are the _same_ (`[ 0.00466771  0.00414385  0.00415603 -0.00520032 -0.00212164]...`)
- **multi-processing**: the NumPy random initialization runs in _each_ process again. There's no shared memory, every process has it's own set of `a`, `b`, and `c` arrays, which is why all outputs are _different_. Initializing the arrays in each process adds an additional runtime overhead, which is why here the multi-processing case is slightly slower.

## Scaling on multiple machines

## Dask

There are different ways to scale to multiple machines in a computing cluster. 

The most classic ways are independent of Python and let you submit any type of program 'job' to a cluster, e.g. using HTCondor or Slurm.

In Python, one solution emerged that allows to submit jobs/tasks to a cluster of workers from a Python program: **Dask**.

### Dask example

In [6]:
import dask.distributed

from quadratic_formula_scaling import quadratic_formula, task_ids

In [7]:
%%timeit -n 1 -r 1

with (
    # multi-threading cluster
    dask.distributed.LocalCluster(n_workers=1, threads_per_worker=10) as cluster,
    # multi-processing cluster
    # dask.distributed.LocalCluster(n_workers=10, threads_per_worker=1) as cluster,
    # mixed cluster
    # dask.distributed.LocalCluster(n_workers=2, threads_per_worker=5) as cluster,
    # ---
    # connect to a Dask cluster
    dask.distributed.Client(cluster) as client,
):

     futures = client.map(quadratic_formula, task_ids)

     for future in dask.distributed.as_completed(futures):
         output, task_id = future.result()
         print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 0 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 1 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 2 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 3 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 4 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 5 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 6 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 7 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 8 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
Task 9 completed with output [ 0.11779029  0.04675833  0.07050445 -0.1070739   0.02331849]...
2.53 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Dask in practice

Dask abstracts the cluster from the user. If there's a dask-scheduler running somewhere, we can connect to it using:

```python
with dask.distributed.Client(address='127.0.0.1:8786') as client: # some IP+port address
    ...
```

This is great, as scientist never have to worry about the cluster infrastructure, with a single lince of code you can connect and run your analysis distributed on multiple workers and optionally using multiple threads per worker.

One cluster in HEP that's commonly used for analysis is: https://coffea.casa/.

### Dask is much more than a distributive interface

Dask allows to define compute graphs of complex workflows. These graphs can then be scheduled automatically by the dask-scheduler such that the most optimal execution of tasks is achieved.

In [8]:
import dask

In [9]:
@dask.delayed  # mark function as delayed/task
def increment(i):
    return i + 1

@dask.delayed  # mark function as delayed/task
def add(a, b):
    return a + b


# define compute graph
a, b = 1, 12
c = increment(a)
d = increment(b)
output = add(c, d)

# visualize compute graph
try:
    output.visualize(rankdir="LR", filename="img/compute_graph_simple.svg")  # left to right
except Exception as e:
    ... # install graphviz and python-graphviz to visualize the graph

# compute the result
print("Result:", output.compute())

Result: 15


![image](compute_graph_simple.svg)

In [10]:
import numpy as np
import dask.array as da

# use dask arrays instead of Numpy arrays for automatic graph construction
a = da.random.uniform(5, 10, 100_000)
b = da.random.uniform(10, 20, 100_000)
c = da.random.uniform(-0.1, 0.1, 100_000)

def quadratic_formula(a, b, c):
    return (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)

# define compute graph
output = quadratic_formula(a, b, c)

# visualize compute graph
try:
    output.visualize(rankdir="LR", filename="img/compute_graph_quadratic_formula.svg")
    output.visualize(rankdir="LR", filename="img/compute_graph_quadratic_formula__optimized.svg", optimize_graph=True)
except Exception as e:
    ... # install graphviz and python-graphviz to visualize the graph


# compute the result
unoptimized_result = dask.compute(output, optimize_graph=False)[0]
print("Result (unoptimized):", unoptimized_result)

optimized_result = dask.compute(output, optimize_graph=True)[0]
print("Result (optimized):", optimized_result)

assert np.allclose(unoptimized_result, optimized_result)


Result (unoptimized): [-0.00198122 -0.00250379  0.00183553 ... -0.00302782 -0.00104595
  0.00361832]
Result (optimized): [-0.00198122 -0.00250379  0.00183553 ... -0.00302782 -0.00104595
  0.00361832]


#### Compute Graph

![image](compute_graph_quadratic_formula.svg)

#### Compute Graph _optimized_

![image](compute_graph_quadratic_formula__optimized.svg)

In [11]:
def fun(a, b, c):
    return np.mean((-b + np.sqrt(b**2 - 4*a*c)) / (2*a))

In [12]:
%%timeit -n 1 -r 1

a = np.random.uniform(5, 10, 200_000_000)
b = np.random.uniform(10, 20, 200_000_000)
c = np.random.uniform(-0.1, 0.1, 200_000_000)

output = fun(a, b, c)
print("Result:", output)

Result: -9.433059105949177e-06
5.05 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [13]:
# A too slow problem for a single machine... (200M points)

# use dask arrays instead of Numpy arrays for automatic graph construction
da_a = da.random.uniform(5, 10, 200_000_000)
da_b = da.random.uniform(10, 20, 200_000_000)
da_c = da.random.uniform(-0.1, 0.1, 200_000_000)

# define compute graph
output = fun(da_a, da_b, da_c)

print(output.dask)

HighLevelGraph with 15 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x147be2b70>
 0. uniform-d9a80d5014b30765ee3ac34cfc333536
 1. uniform-11e71ea831934b3f1ea21e1200039432
 2. mul-05395716fa76c9c9d94061ad5de75489
 3. mul-a1c4f09a0ba68e81c5b4f91e742ac003
 4. mul-6685224026c740b180b179eb07dfb57b
 5. uniform-4346d4c1062338dec9a7ada3dba88221
 6. pow-7d906bee67e2968de11d45303b3e6ddb
 7. sub-cfcd2cf8147862cfbddb64a5c2380d53
 8. sqrt-9f67fff8a3a9e6f683f867267807bb80
 9. neg-32d8f6834fc47374af27d00e832680af
 10. add-1aa49380596d719705d3dc8c470c003d
 11. truediv-54a054f9e6ddcbea0a899b833433e646
 12. mean_chunk-d6f210b0bddd85549af99a31658a81c1
 13. mean_combine-partial-abf034d1883ee460900829515e44ed5e
 14. mean_agg-aggregate-64a0befc14efdfc6a3535f97498458eb



In [14]:
%%timeit -n 1 -r 1

with (
    dask.distributed.LocalCluster(n_workers=1, threads_per_worker=10) as cluster,
    # ---
    # connect to a Dask cluster
    dask.distributed.Client(cluster) as client,
):
    # compute the result
    print("Result:", client.compute(output).result())

Result: -9.278738954981081e-06
1.57 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
