# Lesson 5: Distributed Scaling

In lesson 4 you've learned how to be as efficient as possible on a single CPU.

Now we want to use multiple CPUs at the same time (horizontal scaling).

![image](https://raw.githubusercontent.com/nsmith-/2025-09-15-hsf-india-tutorial-chandigarh/cda03896f0f7d4d195db3fedf00a8754f6774214/img/horizontal-and-vertical-scaling.svg)

## Scaling on a single machine

A single computer may have multiple cores available that we can use to concurrently run software.

In python the two ways to parallelize programs are: **_multi-processing_** and **_multi-threading_**.

**Multi-processing** is essentially about running multiple OS processes in parallel, each with its own Python interpreter, memory space, etc.

**Multi-threading**, however, uses a shared memory space and a common Python interpreter to spawn multiple threads that can run tasks concurrently. 
_Caution_: In Python multi-threading is in fact not running programs in parallel because of the Global Interpreter Lock (GIL), only since python 3.14 (free-threaded) this is possible.

In [1]:
import concurrent.futures

In [2]:
%%writefile quadratic_formula_scaling.py

# writing to a separate file, because multiprocessing is buggy in notebooks (see: https://bugs.python.org/issue25053)
import numpy as np
import time

a = np.random.uniform(5, 10, 1_000_000)
b = np.random.uniform(10, 20, 1_000_000)
c = np.random.uniform(-0.1, 0.1, 1_000_000)

def quadratic_formula(task_id):
    time.sleep(task_id / 5.0)  # simulate some task_id-dependent workload
    return (-b + np.sqrt(b**2 - 4*a*c)), task_id  # return task_id for identification


# setup 10 tasks
task_ids = range(10)

Overwriting quadratic_formula_scaling.py


### Python for-loop example

In [3]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids

for task_id in task_ids:
    output, _ = quadratic_formula(task_id)
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 0 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 1 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 2 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 3 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 4 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 5 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 6 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 7 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 8 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 9 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
9.22 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Multi-threading example

In [4]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids


with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(quadratic_formula, task_id) for task_id in task_ids]

for future in concurrent.futures.as_completed(futures):
    output, task_id = future.result()
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 9 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 6 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 3 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 5 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 1 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 4 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 7 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 8 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 2 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
Task 0 completed with output [-0.09013922  0.05697245 -0.03951349  0.09129888 -0.0409659 ]...
1.81 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Multi-processing example

In [5]:
%%timeit -n 1 -r 1

from quadratic_formula_scaling import quadratic_formula, task_ids

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(quadratic_formula, task_id) for task_id in task_ids]

for future in concurrent.futures.as_completed(futures):
    output, task_id = future.result()
    print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 1 completed with output [ 0.03394199  0.01501834 -0.06432296  0.02622741 -0.00209589]...
Task 9 completed with output [-0.02846331  0.0582098   0.02928256 -0.0387872  -0.10826982]...
Task 3 completed with output [ 0.03068054  0.05643298  0.08051912  0.00615857 -0.08426115]...
Task 8 completed with output [-0.00191096 -0.03328892  0.05194834  0.0437548   0.09948347]...
Task 4 completed with output [-0.03730058 -0.10470913 -0.09087746 -0.02293532  0.078815  ]...
Task 7 completed with output [-0.0043617   0.05602305 -0.02492723 -0.03497081 -0.02086848]...
Task 2 completed with output [-0.05537196  0.10962525  0.06904769 -0.00966181 -0.01034422]...
Task 0 completed with output [ 0.04239834 -0.06100824  0.03196285  0.07192884  0.01925065]...
Task 5 completed with output [ 0.00652812 -0.02156344  0.03089721 -0.02991593 -0.02378343]...
Task 6 completed with output [ 0.03326095  0.10499884 -0.12506123  0.10116294 -0.0258807 ]...
2.08 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

Multi-threading and multi-processing can speed-up your programs by concurrently executing multiple tasks across multiple threads or processes.

Here, we can see in addition the effect of shared (**multi-threading**) vs individual (**multi-processing**) memory space: 
- **multi-threading**: the NumPy random initialization runs _once_ and the memory is shared between all threads, which is why all outputs are the _same_ (`[ 0.00466771  0.00414385  0.00415603 -0.00520032 -0.00212164]...`)
- **multi-processing**: the NumPy random initialization runs in _each_ process again. There's no shared memory, every process has it's own set of `a`, `b`, and `c` arrays, which is why all outputs are _different_. Initializing the arrays in each process adds an additional runtime overhead, which is why here the multi-processing case is slightly slower.

## Scaling on multiple machines

## Dask

There are different ways to scale to multiple machines in a computing cluster. 

The most classic ways are independent of Python and let you submit any type of program 'job' to a cluster, e.g. using HTCondor or Slurm.

In Python, one solution emerged that allows to submit jobs/tasks to a cluster of workers from a Python program: **Dask**.

### Dask example

In [6]:
import dask.distributed

from quadratic_formula_scaling import quadratic_formula, task_ids

In [7]:
%%timeit -n 1 -r 1

with (
    # multi-threading cluster
    dask.distributed.LocalCluster(n_workers=1, threads_per_worker=10) as cluster,
    # multi-processing cluster
    # dask.distributed.LocalCluster(n_workers=10, threads_per_worker=1) as cluster,
    # mixed cluster
    # dask.distributed.LocalCluster(n_workers=2, threads_per_worker=5) as cluster,
    # ---
    # connect to a Dask cluster
    dask.distributed.Client(cluster) as client,
):

     futures = client.map(quadratic_formula, task_ids)

     for future in dask.distributed.as_completed(futures):
         output, task_id = future.result()
         print(f"Task {task_id} completed with output {output[:5]}...")  # print first 5 results of each task

Task 0 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 1 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 2 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 3 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 4 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 5 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 6 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 7 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 8 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
Task 9 completed with output [ 0.10388127  0.11691756 -0.00871209  0.03366137 -0.08243955]...
2.51 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop ea

### Dask in practice

Dask abstracts the cluster from the user. If there's a dask-scheduler running somewhere, we can connect to it using:

```python
with dask.distributed.Client(address='127.0.0.1:8786') as client: # some IP+port address
    ...
```

This is great, as scientist never have to worry about the cluster infrastructure, with a single lince of code you can connect and run your analysis distributed on multiple workers and optionally using multiple threads per worker.

One cluster in HEP that's commonly used for analysis is: https://coffea.casa/.

### Dask is much more than a distributive interface

Dask allows to define compute graphs of complex workflows. These graphs can then be scheduled automatically by the dask-scheduler such that the most optimal execution of tasks is achieved.

In [8]:
import dask

In [9]:
@dask.delayed  # mark function as delayed/task
def increment(i):
    return i + 1

@dask.delayed  # mark function as delayed/task
def add(a, b):
    return a + b


# define compute graph
a, b = 1, 12
c = increment(a)
d = increment(b)
output = add(c, d)

# visualize compute graph
try:
    output.visualize(rankdir="LR", filename="compute_graph_simple.svg")  # left to right
except Exception as e:
    ... # install graphviz and python-graphviz to visualize the graph

# compute the result
print("Result:", output.compute())

Result: 15


![image](compute_graph_simple.svg)

In [10]:
import numpy as np
import dask.array as da

# use dask arrays instead of Numpy arrays for automatic graph construction
a = da.random.uniform(5, 10, 100_000)
b = da.random.uniform(10, 20, 100_000)
c = da.random.uniform(-0.1, 0.1, 100_000)

def quadratic_formula(a, b, c):
    return (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)

# define compute graph
output = quadratic_formula(a, b, c)

# visualize compute graph
try:
    output.visualize(rankdir="LR", filename="compute_graph_quadratic_formula.svg")
    output.visualize(rankdir="LR", filename="compute_graph_quadratic_formula__optimized.svg", optimize_graph=True)
except Exception as e:
    ... # install graphviz and python-graphviz to visualize the graph


# compute the result
unoptimized_result = dask.compute(output, optimize_graph=False)[0]
print("Result (unoptimized):",unoptimized_result)

optimized_result = dask.compute(output, optimize_graph=True)[0]
print("Result (optimized):",optimized_result)

assert np.allclose(unoptimized_result, optimized_result)


Result (unoptimized): [-0.00216594 -0.00179599 -0.00366412 ... -0.00402998 -0.00172901
 -0.00177584]
Result (optimized): [-0.00216594 -0.00179599 -0.00366412 ... -0.00402998 -0.00172901
 -0.00177584]


#### Compute Graph

![image](compute_graph_quadratic_formula.svg)

#### Compute Graph _optimized_

![image](compute_graph_quadratic_formula__optimized.svg)

In [34]:
def fun(a, b, c):
    return np.mean((-b + np.sqrt(b**2 - 4*a*c)) / (2*a))

In [35]:
%%timeit -n 1 -r 1

a = np.random.uniform(5, 10, 200_000_000)
b = np.random.uniform(10, 20, 200_000_000)
c = np.random.uniform(-0.1, 0.1, 200_000_000)

output = fun(a, b, c)
print("Result:", output)

Result: -9.180587659394628e-06
3.85 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [36]:
# A too slow problem for a single machine... (200M points)

# use dask arrays instead of Numpy arrays for automatic graph construction
da_a = da.random.uniform(5, 10, 200_000_000)
da_b = da.random.uniform(10, 20, 200_000_000)
da_c = da.random.uniform(-0.1, 0.1, 200_000_000)

# define compute graph
output = fun(da_a, da_b, da_c)

print(output.dask)

HighLevelGraph with 15 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x16bbf98b0>
 0. uniform-21dc9912f2ec1156291398a4a57e6527
 1. uniform-b4231d0fd7d70f2032564265e763a151
 2. mul-11813006278d2b2637609aa773f0a403
 3. mul-185508c3fd2ba5880e5e9283034d2556
 4. mul-27ff89c39decd0b05b3c4a976b49370d
 5. uniform-582f4042bd494ae9ea7f532737258491
 6. pow-23ca4af040b3575877f7013e5f67433f
 7. sub-d0f052e0de5dadd6e941f3df46b94056
 8. sqrt-24603825b75794abfe6fdda4495cdbf1
 9. neg-2cfdb1bf482739b07398de317dd07387
 10. add-5e3505fb28720030b4be7b8b16fe0a39
 11. truediv-da7348c675e49ffc715f2cbb5267669e
 12. mean_chunk-43b5518a07322c394b6988178e025200
 13. mean_combine-partial-59d95de71b4d4f663ce9d6ccbb05b3f2
 14. mean_agg-aggregate-8c09f0a9dcc49d590a5a9a804a341de2



In [37]:
%%timeit -n 1 -r 1

with (
    dask.distributed.LocalCluster(n_workers=1, threads_per_worker=10) as cluster,
    # ---
    # connect to a Dask cluster
    dask.distributed.Client(cluster) as client,
):
    # compute the result
    print("Result (unoptimized):", client.compute(output, optimize_graph=False).result())

Result (unoptimized): -9.167004488149868e-06
1.55 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [38]:
%%timeit -n 1 -r 1

with (
    dask.distributed.LocalCluster(n_workers=1, threads_per_worker=10) as cluster,
    # ---
    # connect to a Dask cluster
    dask.distributed.Client(cluster) as client,
):
    # compute the result
    print("Result (optimized):", client.compute(output, optimize_graph=True).result())

Result (unoptimized): -9.167004488149868e-06
1.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
