## Dask Scaling Study

We benchmark a suite of dask computations of three types:

1.  General task scheduling
2.  Multi-dimensional arrays
3.  Dataframes

In each case we consider computations of varying complexity including computations that are embarrassingly parallel, communication heavy, involve many small communications, or structured in non-trivial ways.

Additionally we consider tasks that are very fast (microseconds) so as to stress the scheduler early as well as tasks that take a more modest amount of time, usually 100ms each.

We perform the same computations in increasing sizes that scale linearly with the cluster size.  The cluster is composed of 2-core containers on Google compute engine ranging from 1 container to 256 containers (512 cores total).

### Benchmarking infrastructure

In [1]:
import math, time
from dask.distributed import Client, wait



In [2]:
from distributed.client import default_client
import pandas as pd

def run(func, client=None):
    client = client or default_client()
    client.restart()
    n = sum(client.ncores().values())
    coroutine = func(n)

    name, unit, numerator = next(coroutine)
    out = []
    while True:
        # time.sleep(1)
        start = time.time()
        try:
            next_name, next_unit, next_numerator = next(coroutine)
        except StopIteration:
            break
        finally:
            end = time.time()
            record = {'name': name, 
                      'duration': end - start, 
                      'unit': unit + '/s', 
                      'rate': numerator / (end - start), 
                      'n': n,
                      'collection': func.__name__}
            out.append(record)
        name = next_name
        unit = next_unit
        numerator = next_numerator
    return pd.DataFrame(out)

### Benchmarks

These functions include a variety of computations for tasks, arrays, and dataframes.

In [3]:
import operator
import time

def slowinc(x, delay=0.1):
    time.sleep(delay)
    return x + 1

def slowadd(x, y, delay=0.1):
    time.sleep(delay)
    return x + y

def slowsum(L, delay=0.1):
    time.sleep(delay)
    return sum(L)

def inc(x):
    return x + 1


def tasks(n):
    yield 'task map fast tasks', 'tasks', n * 200
    
    futures = client.map(inc, range(n * 200))
    wait(futures)
    
    yield 'task map 100ms tasks', 'tasks', n * 100

    futures = client.map(slowinc, range(100 * n))
    wait(futures)
        
    yield 'task map 1s tasks', 'tasks', n * 4

    futures = client.map(slowinc, range(4 * n), delay=1)
    wait(futures)

    yield 'tree reduction fast tasks', 'tasks', 2**7 * n
    
    from dask import delayed

    L = range(2**7 * n)
    while len(L) > 1:
        L = list(map(delayed(operator.add), L[0::2], L[1::2]))

    L[0].compute()
    
    yield 'tree reduction 100ms tasks', 'tasks', 2**6 * n * 2
    
    from dask import delayed

    L = range(2**6 * n)
    while len(L) > 1:
        L = list(map(delayed(slowadd), L[0::2], L[1::2]))

    L[0].compute()
    
    yield 'sequential', 'tasks', 100

    x = 1

    for i in range(100):
        x = delayed(inc)(x)
        
    x.compute()
    
    yield 'dynamic tree reduction fast tasks', 'tasks', 100 * n
    
    from dask.distributed import as_completed
    futures = client.map(inc, range(n * 100))
    
    pool = as_completed(futures)
    batches = pool.batches()
    
    while True:
        try:
            batch = next(batches)
            if len(batch) == 1:
                batch += next(batches)
        except StopIteration:
            break
        future = client.submit(sum, batch)
        pool.add(future)
        
    yield 'dynamic tree reduction 100ms tasks', 'tasks', 100 * n
    
    from dask.distributed import as_completed
    futures = client.map(slowinc, range(n * 20))
    
    pool = as_completed(futures)
    batches = pool.batches()
    
    while True:
        try:
            batch = next(batches)
            if len(batch) == 1:
                batch += next(batches)
        except StopIteration:
            break
        future = client.submit(slowsum, batch)
        pool.add(future)

        
    yield 'nearest neighbor fast tasks', 'tasks', 100 * n * 2
    
    L = range(100 * n)
    L = client.map(operator.add, L[:-1], L[1:])
    L = client.map(operator.add, L[:-1], L[1:])
    wait(L)
    
    yield 'nearest neighbor 100ms tasks', 'tasks', 20 * n * 2
    
    L = range(20 * n)
    L = client.map(slowadd, L[:-1], L[1:])
    L = client.map(slowadd, L[:-1], L[1:])
    wait(L)

In [4]:
def arrays(n):
    import dask.array as da
    N = int(5000 * math.sqrt(n))
    x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))
    
    yield 'create random', 'MB', x.nbytes / 1e6
    
    x = x.persist()
    wait(x)
    
    yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_blocks(slowinc, dtype=x.dtype).persist()
    wait(y)
    
    yield 'random access', 'bytes', 8
    
    x[1234, 4567].compute()
   
    yield 'reduction', 'MB', x.nbytes / 1e6
    
    x.std().compute()
    
    yield 'reduction along axis', 'MB', x.nbytes / 1e6
    
    x.std(axis=0).compute()
    
    yield 'elementwise computation', 'MB', x.nbytes / 1e6
    
    y = da.sin(x) ** 2 + da.cos(x) ** 2
    y = y.persist()
    wait(y)    
    
    yield 'rechunk small', 'MB', x.nbytes / 1e6
    
    y = x.rechunk((20000, 200)).persist()
    wait(y)
    
    yield 'rechunk large', 'MB', x.nbytes / 1e6
    
    y = y.rechunk((200, 20000)).persist()
    wait(y)
    
    yield 'transpose addition', 'MB', x.nbytes / 1e6
    y = x + x.T
    y = y.persist()
    wait(y)
    
    yield 'nearest neighbor fast tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_overlap(inc, depth=1).persist()
    wait(y)   
        
    yield 'nearest neighbor 100ms tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_overlap(slowinc, depth=1, delay=0.1).persist()
    wait(y)    

In [5]:
def dataframes(n):
    import dask.array as da
    import dask.dataframe as dd
    N = 2000000 * n
    
    x = da.random.randint(0, 10000, size=(N, 10), chunks=(1000000, 10))

    
    yield 'create random', 'MB', x.nbytes / 1e6
    
    df = dd.from_dask_array(x).persist()
    wait(df)
    
    yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6
    
    wait(df.map_partitions(slowinc, meta=df).persist())
    
    yield 'arithmetic', 'MB', x.nbytes / 1e6
    
    y = (df[0] + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10).persist()
    wait(y)
    
    yield 'random access', 'bytes', 8
    
    df.loc[123456].compute()
    
    yield 'dataframe reduction', 'MB', x.nbytes / 1e6
    
    df.std().compute()
    
    yield 'series reduction', 'MB', x.nbytes / 1e6 / 10
    
    df[3].std().compute()
    
    yield 'groupby reduction', 'MB', x.nbytes / 1e6
    
    df.groupby(0)[1].mean().compute()
    
    yield 'groupby apply (full shuffle)', 'MB', x.nbytes / 1e6
    
    df.groupby(0).apply(len).compute()
    
    yield 'set index (full shuffle)', 'MB', x.nbytes / 1e6
    
    wait(df.set_index(1).persist())
    
    yield 'rolling aggregations', 'MB', x.nbytes / 1e6
    
    wait(df.rolling(5).mean().persist())

### Connect to cluster

We connect to a scheduler on GCE.  We start with a single 2-core worker.

In [6]:
%time
import math, time
from dask.distributed import Client, wait


import dask
from dask.distributed import Client
import os

scheduler_file = os.path.join(os.environ["SCRATCH"], "scheduler_file.json")

dask.config.config["distributed"]["dashboard"]["link"] = "{JUPYTERHUB_SERVICE_PREFIX}proxy/{host}:{port}/status" 

client = Client(scheduler_file=scheduler_file)
client
client

CPU times: user 1 µs, sys: 1 µs, total: 2 µs
Wall time: 4.77 µs


0,1
Connection method: Scheduler file,Scheduler file: /pscratch/sd/s/sanjeevc/scheduler_file.json
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,

0,1
Comm: tcp://10.249.17.7:8786,Workers: 0
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [7]:
L = []

### Scale cluster and run computations

We use the [dask-kubernetes](https://github.com/martindurant/dask-kubernetes) command line to scale our cluster up and down manually and run the comptuations above multiple times for each scale.  We save these results to Google Cloud Storage.  The outputs are publicly available here:

-  Raw: https://storage.googleapis.com/dask-data/scaling-data-raw.csv
-  Median: https://storage.googleapis.com/dask-data/scaling-data.csv

In [8]:

client

0,1
Connection method: Scheduler file,Scheduler file: /pscratch/sd/s/sanjeevc/scheduler_file.json
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,

0,1
Comm: tcp://10.249.17.7:8786,Workers: 0
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [9]:
client

0,1
Connection method: Scheduler file,Scheduler file: /pscratch/sd/s/sanjeevc/scheduler_file.json
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,

0,1
Comm: tcp://10.249.17.7:8786,Workers: 0
Dashboard: /user/sanjeevc/perlmutter-login-node-base/proxy/10.249.17.7:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [None]:
%%time
for i in range(3):
    for func in [tasks, arrays, dataframes]:
        print(i, func.__name__)
        df = run(func, client=client)
        L.append(df)

0 tasks


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-10-08 15:55:38,173 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [11]:
ddf = pd.concat(L)

ValueError: No objects to concatenate

In [None]:
df = ddf.groupby(['collection', 'name', 'n', 'unit']).median()
df

In [None]:
df.to_csv('scaling-data.csv')

In [None]:
#import gcsfs
#gcs = gcsfs.GCSFileSystem(token='cloud')
#gcs.put('scaling-data.csv', 'dask-data/scaling-data.csv')

In [None]:
ddf.to_csv('scaling-data-raw.csv')
gcs.put('scaling-data-raw.csv', 'dask-data/scaling-data-raw.csv')