# Dask parallel across slurm cluster

## Submit CPU jobs

Here we first inititate a slurm profile of job profile that we want. Each job will have 4 cores and 20GB of memory.

In [1]:
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    queue='tinyq',
    cores=4,
    memory="20 GB",
    job_extra_directives=["-q tinyq"]
)

We can inspect the job script

In [2]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p tinyq
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=19G
#SBATCH -t 00:30:00
#SBATCH -q tinyq

/home/yzheng/.cache/uv/archive-v0/fi_ii00GnZedQflf_x7Hx/bin/python -m distributed.cli.dask_worker tcp://10.110.81.3:40125 --name dummy-name --nthreads 1 --memory-limit 4.66GiB --nworkers 4 --nanny --death-timeout 60



Now we need to assign the cluster profile to a client which handles the job scheduling.

In [3]:
from dask.distributed import Client
client = Client(cluster)

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.110.81.3:8787/status,

0,1
Dashboard: http://10.110.81.3:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.110.81.3:40125,Workers: 0
Dashboard: http://10.110.81.3:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


Notice that we only have one job interactiveq for now

In [5]:
!squeue -u yzheng

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           8856497 interacti     bash   yzheng  R      12:20      1 d003


We can specifically request for tow jobs

In [11]:
cluster.scale(10)  # ask for 10 jobs

In [7]:
# cluster.adapt(maximum_job=10) # or be flexible, ask for maximum 10 jobs

<distributed.deploy.adaptive.Adaptive at 0x15551dd1b770>

Let's create a simple function and distributed it across machines.

In [17]:
import time
import numpy as np

def long_job(seed):
    rng = np.random.default_rng(seed)
    return rng.random(10)

In [18]:
futures = []
for seed in range(10):
    future = client.submit(long_job, seed)
    futures.append(future)

In [27]:
!squeue -u yzheng

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           8856497 interacti     bash   yzheng  R      14:03      1 d003
           8856572     tinyq dask-wor   yzheng PD       0:00      1 (Priority)


In [24]:
futures[0]

To get the result back

In [30]:
futures[0].result()

array([0.63696169, 0.26978671, 0.04097352, 0.01652764, 0.81327024,
       0.91275558, 0.60663578, 0.72949656, 0.54362499, 0.93507242])

Now the status of the future becomes finished

In [31]:
futures[0]

In [32]:
client.shutdown()

## Submit GPU jobs

In [1]:
from dask_jobqueue import SLURMCluster

gpu_cluster = SLURMCluster(
    queue='gpu',
    cores=8,
    memory="20 GB",
    job_extra_directives=["-q gpu", "--gres=gpu:h100_1g.12gb:1"]
)

In [2]:
from dask.distributed import Client

client = Client(gpu_cluster)

In [3]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.110.81.3:8787/status,

0,1
Dashboard: http://10.110.81.3:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.110.81.3:41483,Workers: 0
Dashboard: http://10.110.81.3:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [8]:
gpu_cluster.scale(1) # Get a job

In [9]:
import subprocess

def inspect_gpu():
    return subprocess.run("nvidia-smi", shell=True, capture_output=True)

If we run the command locally, there is no nvidia gpu

In [10]:
print(inspect_gpu().stderr.decode('utf-8'))

/bin/sh: nvidia-smi: command not found



Let's submit the job to GPU node

In [11]:
future = client.submit(inspect_gpu)

You will see that we get exactly what we requested, the H100 GPU

In [13]:
print(future.result().stdout.decode('utf-8'))

Tue Mar 18 19:07:24 2025       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.216.01             Driver Version: 535.216.01   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA H100 NVL                On  | 00000000:CA:00.0 Off |                   On |
| N/A   32C    P0              62W / 400W |     76MiB / 95830MiB |     N/A      Default |
|                                         |                      |              Enabled |
+-----------------------------------------+----------------------+----------------------+

+------------------------------------------------------------------

To run a resnet

In [15]:
import timm
import torch

def run_model():
    img = torch.randn(1, 3, 256, 256)
    img = img.to('cuda')
    model = timm.create_model("resnet50")
    model.to('cuda')
    return model(img).detach().cpu().numpy()

This will failed witout GPU

In [16]:
run_model()

RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx

The submission to a GPU node will work

In [18]:
future = client.submit(run_model)
future.result().shape

(1, 1000)