In [1]:
import dask.array as da
import numpy as np
from time import time, sleep

# Cluster Setup

Increasing the number of workers is better to speed up Python tasks that do not need to communicate (not share memory) and are CPU intensive. However, combining the results from different tasks takes time. For Dask, there is no hard limit on scaling, but the task overhead will eventually start to swamp your calculation depending on how long each task takes to compute. 

The Dask scheduler has an overhead of around 200 microseconds per task, so if each task takes 1 second of work then the workers begin to swamp around 5000 workers, but if each task takes only 100ms then your scheduler can only saturate around 500 cores, and so on. Bear in mind that task duration imposes an inversely proportional constraint on scaling. So, if you want to scale even larger then your tasks will need to start doing more work in each task to avoid overhead. Often this involves moving inner for loops within tasks rather than spreading them out to many tasks.

For threads, the standard python implementation uses a Global Interpreter Lock (GIL), which prevent two threads from executing simultaneously in the same program. However, libraries like Numpy bypass this limitation by running external code in C. So, in general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter. Furthermore, threads are best for IO tasks or tasks involving external systems because threads can combine their work more efficiently than processes (share memory space and efficiently read and write to the same variables).

Triton (IBM POWER System AC922) has at least 16 cores per processor, so the rule of thumb for threads per Dask worker is to choose the square root of the number of cores per processor. For Triton for example, this would mean that one could assign 4 threads per worker.

In [2]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=16, memory_limit='14GB', processes=True, threads_per_worker=4)

In [3]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:46413  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 64  Memory: 224.00 GB


# Monte-Carlo Estimate of $\\pi$

We want to estimate the number $\\pi$ using a Monte-Carlo method (https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)
exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centered at a corner of the unit square is $\pi/4$ as well.  So, for $N$ randomly chosen pairs $(x, y)$ with $x \in [0, 1)$ and $y \in [0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimate $\pi \approx 4 \cdot N_{circ} / N$.
    
    

In [4]:
def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    chunksize = int(chunksize_in_bytes / 8)
    
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=(chunksize / 2, 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi


def print_pi_stats(size, pi, time_delta, num_workers):
    """Print pi, calculate offset from true value, and print stats."""
    print(f"{size / 1e9} GB\n"
          f"\tMC pi: {pi : 13.11f}"
          f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
          f"\tWorkers: {num_workers}"
          f"\t\tTime single loop MC: {time_delta : 7.3f}s")

## Chunksize Testing

Here, a good rule of thumb is to create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 matrix). If your chunks are too small, queueing up operations will be extremely slow, because Dask will translate each operation into a huge number of operations mapped across many chunks. 

Computation on Dask arrays with small chunks can also be slow, because each operation on a chunk has some fixed overhead from the Python interpreter and the Dask task executor. Morevover, with large arrays (10+ GB), the cost of queueing up Dask operations can be noticeable, and you may need to increase to larger chunksizes. So, chunk sizes between 10MB-1GB are common, depending on the availability of RAM (they need to fit) and the duration of computations. A chunk should be small enough to fit comfortably in memory as there will be many chunks in memory at once.

In [5]:
for size in (1e9 * n for n in (10, 100)):
    %timeit pi = calc_pi_mc(size,chunksize_in_bytes=5e6).compute()
    start = time()
    pi = calc_pi_mc(size,chunksize_in_bytes=5e6).compute()
    elaps = time() - start

    print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

4.26 s ± 290 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14147844480	Err:  1.142e-04
	Workers: 16		Time single loop MC:   4.451s
38.9 s ± 574 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14160869248	Err:  1.604e-05
	Workers: 16		Time single loop MC:  47.545s


Noticeable overhead and slow down with chuncksize way too small and as we increase the array size. Let's make a five fold increase in chunksize, and consider larger arrays of TBs,

In [6]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size,chunksize_in_bytes=25e6).compute()
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=25e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=25e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.52 s ± 34.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14162829440	Err:  3.564e-05
	Workers: 16		Time single loop MC:   1.522s
13 s ± 78.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14159653312	Err:  3.880e-06
	Workers: 16		Time single loop MC:  15.399s
1000.0 GB
	MC pi:  3.14158926131	Err:  3.392e-06
	Workers: 16		Time single loop MC:  137.760s
2500.0 GB
	MC pi:  3.14159733786	Err:  4.684e-06
	Workers: 16		Time single loop MC:  342.625s


Considerable reduction in time for 10GB and 100GB. Let's increase it to about the optimal value for larger arrays in the next two estimates

In [7]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size,chunksize_in_bytes=100e6).compute()
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=100e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=100e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.38 s ± 30.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14159518080	Err:  2.527e-06
	Workers: 16		Time single loop MC:   1.330s
10.6 s ± 43.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14159720128	Err:  4.548e-06
	Workers: 16		Time single loop MC:  11.450s
1000.0 GB
	MC pi:  3.14158536102	Err:  7.293e-06
	Workers: 16		Time single loop MC:  105.790s
2500.0 GB
	MC pi:  3.14159773745	Err:  5.084e-06
	Workers: 16		Time single loop MC:  265.119s


In [8]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size,chunksize_in_bytes=200e6).compute()
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=200e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=200e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.46 s ± 88.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14153104640	Err:  6.161e-05
	Workers: 16		Time single loop MC:   1.434s
10.5 s ± 95.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14160082560	Err:  8.172e-06
	Workers: 16		Time single loop MC:  10.652s
1000.0 GB
	MC pi:  3.14160204998	Err:  9.396e-06
	Workers: 16		Time single loop MC:  100.579s
2500.0 GB
	MC pi:  3.14158933402	Err:  3.320e-06
	Workers: 16		Time single loop MC:  249.243s


Notice that in the last two estimates above there is some degradation of computing time for the smaller array and improvement for the larger (TB) ones. Let's overdue the chunksize in the next two estimates, but beware that we need increasingly more RAM to fit the larger chunks (from about minimum 4GB memory limit to 12GB).

In [9]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size,chunksize_in_bytes=500e6).compute()
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=25e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=500e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.96 s ± 110 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14166693120	Err:  7.428e-05
	Workers: 16		Time single loop MC:   1.670s
10.6 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14160421056	Err:  1.156e-05
	Workers: 16		Time single loop MC:  14.065s
1000.0 GB
	MC pi:  3.14158625978	Err:  6.394e-06
	Workers: 16		Time single loop MC:  96.382s
2500.0 GB
	MC pi:  3.14160002726	Err:  7.374e-06
	Workers: 16		Time single loop MC:  239.586s


In [10]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size,chunksize_in_bytes=1000e6).compute()
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=1000e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size,chunksize_in_bytes=1000e6).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

3.38 s ± 87.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14147066880	Err:  1.220e-04
	Workers: 16		Time single loop MC:   3.278s
11.5 s ± 138 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14157488384	Err:  1.777e-05
	Workers: 16		Time single loop MC:  11.649s
1000.0 GB
	MC pi:  3.14159792595	Err:  5.272e-06
	Workers: 16		Time single loop MC:  95.873s
2500.0 GB
	MC pi:  3.14158702479	Err:  5.629e-06
	Workers: 16		Time single loop MC:  237.575s


If your chunks are too big, some of your computation may be wasted, because Dask only computes results one chunk at a time. Moreover, a chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that Dask scheduling incurs.

Ok, what about automatic chunking? Automatic chunking expands or contracts all dimensions marked with "auto" to try to reach chunk sizes with a number of bytes equal to the config value array.chunk-size, which is set to 128MB by default, but which you can change in your configuration (YAML files in ~/.config/dask/ or /etc/dask/).

In [11]:
# To list your dask conf: 
#import dask
#dask.config.config

In [12]:
def calc_pi_mc_auto(size_in_bytes):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=("auto", 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi

In [13]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc_auto(size).compute()
        start = time()
        pi = calc_pi_mc_auto(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc_auto(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.5 s ± 42.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14171615360	Err:  1.235e-04
	Workers: 16		Time single loop MC:   1.662s
10.6 s ± 88.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14160321216	Err:  1.056e-05
	Workers: 16		Time single loop MC:  10.568s
1000.0 GB
	MC pi:  3.14158107219	Err:  1.158e-05
	Workers: 16		Time single loop MC:  102.963s
2500.0 GB
	MC pi:  3.14159453222	Err:  1.879e-06
	Workers: 16		Time single loop MC:  255.852s


What about no chunk? Let's consider a small 1GB and 5GB arrays (spoiler, takes longer than the 100GB one),

In [14]:
def calc_pi_mc_nochunk(size_in_bytes):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=(-1, 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi

In [15]:
for size in (1e9 * n for n in (1, 5)):
    %timeit pi = calc_pi_mc_nochunk(size).compute()
    start = time()
    pi = calc_pi_mc_nochunk(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

3.11 s ± 46.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.0 GB
	MC pi:  3.14137587200	Err:  2.168e-04
	Workers: 16		Time single loop MC:   3.095s
15 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
5.0 GB
	MC pi:  3.14163559680	Err:  4.294e-05
	Workers: 16		Time single loop MC:  14.984s


# Scaling

Here, we'll scale up the cluster to twice the initial workers and reduce the memory limit to half to keep the total RAM used:

In [16]:
new_num_workers = 2 * len(cluster.workers)

print(f"Scaling from {len(cluster.workers)} to {new_num_workers} workers.")

cluster.scale(new_num_workers)

sleep(10)

Scaling from 16 to 32 workers.


In [17]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:46413  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 32  Cores: 128  Memory: 448.00 GB


In [18]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc(size).compute()
        start = time()
        pi = calc_pi_mc(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.7 s ± 48 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14159820160	Err:  5.548e-06
	Workers: 32		Time single loop MC:   1.610s
9.03 s ± 66.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14161061440	Err:  1.796e-05
	Workers: 32		Time single loop MC:   9.292s
1000.0 GB
	MC pi:  3.14160379200	Err:  1.114e-05
	Workers: 32		Time single loop MC:  86.785s
2500.0 GB
	MC pi:  3.14158614920	Err:  6.504e-06
	Workers: 32		Time single loop MC:  218.076s


What about the adaptive scaling feature which has the ability to add and remove workers as needed?

In [19]:
# Check docstring of distributed.Adaptive for keywords
ca = cluster.adapt(
    minimum=4, maximum=32);

sleep(10)  # Allow for scale-down

In [20]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:46413  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 56.00 GB


In [22]:
for size in (1e9 * n for n in (10, 100, 1000, 2500)):
    if (size == 1e9 * 10) or (size == 1e9 * 100):
        %timeit pi = calc_pi_mc_auto(size).compute()
        start = time()
        pi = calc_pi_mc_auto(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))
    else:
        start = time()
        pi = calc_pi_mc_auto(size).compute()
        elaps = time() - start

        print_pi_stats(size, pi, elaps,
                   num_workers=len(cluster.workers))

1.97 s ± 212 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
10.0 GB
	MC pi:  3.14163765760	Err:  4.500e-05
	Workers: 12		Time single loop MC:   1.726s
9.43 s ± 61.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
100.0 GB
	MC pi:  3.14159765504	Err:  5.001e-06
	Workers: 32		Time single loop MC:  10.075s
1000.0 GB
	MC pi:  3.14160448371	Err:  1.183e-05
	Workers: 32		Time single loop MC:  90.389s
2500.0 GB
	MC pi:  3.14159257869	Err:  7.490e-08
	Workers: 32		Time single loop MC:  231.399s


Here, we also used the auto-chunck. The adaptive scaling specifies which idle workers to release according to:

 - If there are unassigned tasks or any stealable tasks and no idle workers, or if the average memory use is over 50%, then increase the number of workers by a fixed factor (defaults to two).

 - If there are idle workers and the average memory use is below 50% then reclaim the idle workers with the least data on them (after moving data to nearby workers) until we’re near 50%.

## Extra Chunksize Best Practices for Xarray


- Chunks should align with the computation that you want to do. For example, if you plan to frequently slice along a particular dimension, then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. Moreover, if you want to add two arrays, then its convenient if those arrays have matching chunks patterns.

- Do your spatial and temporal indexing (e.g., .sel() or .isel()) early in the pipeline, especially before calling resample() or groupby().

- Save intermediate results to disk as a netCDF files (using to_netcdf()) and then load them again with open_dataset() for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory.

- Specify smaller chunks across space when using open_mfdataset() (e.g., chunks={'latitude': 10, 'longitude': 10}). This makes spatial subsetting easier, because there’s no risk you will load chunks of data referring to different chunks.

- Using the h5netcdf package by passing engine='h5netcdf' to open_mfdataset() can be quicker than the default engine='netcdf4' that uses the netCDF4 package. Moreover, using the parallel = True argument in open_mfdataset() makes the open and preprocess steps to be performed in parallel using dask.delayed (Default is False).

- The dask diagnostics can be useful in identifying performance bottlenecks.


In [23]:
# close workers and cluster
client.close()
cluster.close()

In [24]:
%conda list --explicit

# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-ppc64le
@EXPLICIT
https://conda.anaconda.org/conda-forge/linux-ppc64le/_libgcc_mutex-0.1-conda_forge.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/ca-certificates-2020.4.5.1-hecc5488_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/ld_impl_linux-ppc64le-2.34-h0f24833_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/libgfortran-ng-8.2.0-h822a55f_5.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/libstdcxx-ng-8.2.0-h822a55f_5.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/llvm-openmp-10.0.0-h1bb5118_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/_openmp_mutex-4.5-1_llvm.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/libgcc-ng-8.2.0-hdd5993f_5.tar.bz2
https://conda.anaconda.org/conda-forge/linux-ppc64le/expat-2.2.9-hb209c28_2.tar.bz2
https://conda.anaconda.org/conda-forg

In [25]:
%pip list

Package              Version            
-------------------- -------------------
aiohttp              3.6.2              
async-timeout        3.0.1              
attrs                19.3.0             
backcall             0.1.0              
bleach               3.1.4              
bokeh                1.4.0              
brotlipy             0.7.0              
certifi              2020.4.5.1         
cffi                 1.14.0             
chardet              3.0.4              
click                7.1.1              
cloudpickle          1.3.0              
cryptography         2.8                
cycler               0.10.0             
cytoolz              0.10.1             
dask                 2.15.0             
dask-jobqueue        0.7.1              
dask-labextension    2.0.1              
decorator            4.4.2              
defusedxml           0.6.0              
distributed          2.15.0             
entrypoints          0.3                
HeapDict        