# Dask Basics Example

This notebook goes through some basic dask commands to run some simple (non-coffea) computations on the cluster.

In [1]:
from dask.distributed import Client, progress

This next cell sets up a Condor cluster object, which is where we can make any ClassAd specifications we need to make. We'll also point an environment variable to some necessary Condor configurations.

Near the end of the cell, we ask for the cluster to give us between 1 and 4 threads at a time when we perform computations.

In [2]:
# this cell starts workers in condor
import os
from dask_jobqueue import HTCondorCluster
from dask import delayed
from dask.distributed import Client, as_completed

os.environ["CONDOR_CONFIG"] = "/etc/condor/condor_config"

cluster = HTCondorCluster(
    cores=1,
    memory='4gb',
    disk='1 GB',
    job_extra_directives={
       "+SingularityImage" : '"docker://index.docker.io/coffeateam/coffea-dask:0.7.22-py3.10-g7cbcc"',
        "Requirements": "HasSingularityJobStart",
    }
)
cluster.adapt(minimum=1, maximum=4)


client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.HTCondorCluster
Dashboard: proxy/8787/status,

0,1
Dashboard: proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://144.92.181.248:27997,Workers: 0
Dashboard: proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


The above cell should have created a client object, with a button that says "Launch dashboard in JupyterLab". Clicking that button allows you to watch the Condor workers' memory usage and activity.

Now, we'll define some functions and run them locally.

In [3]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

In [4]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

CPU times: user 14.7 ms, sys: 92.1 ms, total: 107 ms
Wall time: 741 ms


3

Now, we'll "Dask-ify" these functions using `dask.delayed`. Calling these functions does not lead to immediate ("eager") computations, but instead waits until we call `compute` on the functions' result to perform the calculation. This is automatically done on the cluster we created.

In [5]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

In [6]:
x = inc(1)
y = dec(2)
z = add(x, y)
z

Delayed('add-5de68405-a152-4cbb-ad51-bf9895dfcbe3')

In [7]:
z.compute()

3

Now, we'll create a whole list of computations to perform (256 of them). Instead of using `compute`, we'll first call `persist`, since we're working with a large array. This triggers computation in the background, but keeps the computation on the workers until it is actually called with `compute`, typically once reduced to a smaller value.

In [8]:
zs = []

In [9]:
%%time
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

CPU times: user 39.8 ms, sys: 16.5 ms, total: 56.4 ms
Wall time: 46.5 ms


In [10]:
zs = dask.persist(*zs)  # trigger computation in the background

`zs` is now a list of "futures" objects on the cluster. We can then perform new Dask "lazy" computations on those (in the while loop here) and then call `compute` once we've reduced the calculation to a single number.

First, we'll ask for ten workers to try and get more jobs running at a time.

In [14]:
client.cluster.scale(10)  # ask for ten 4-thread workers

In [15]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])  # add neighbors
        new_L.append(lazy)
    L = new_L                       # swap old list for new

dask.compute(L)

([65536],)

At the end, it's good to shut down the cluster to avoid issues.

In [17]:
# shutdown distributed.localCluster
client.shutdown()