Examples demonstrating the use of Dask with xarray and pandas
- Use NPL2024a kernel on NCAR Jupyterhub

In [1]:
# Libraries for reading and working with multidimensional arrays
import numpy as np
import xarray as xr
import datetime
import pandas as pd
# dask
from dask_jobqueue import PBSCluster
import dask.dataframe as dd
# Libraries to assist with animation and visualisations
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
# other
import os, glob
from pathlib import Path
import time
import datetime



# Set up the dask cluster

In [2]:
# connect to dask distributed cluster
n_jobs = 8 # specify number of jobs
# !!!IMPORTANT!!! -> you need to change the account parameter below to an account you belong to
cluster = PBSCluster(account='UPSU0052', 
                     queue='casper', 
                     memory='16GB', 
                     cores=1, 
                     processes=1, 
                     walltime='02:00:00')
cluster.scale(jobs=n_jobs) # scale up by n_jobs
client = cluster.get_client()
print(cluster.job_script()) # show the submitted job script

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A UPSU0052
#PBS -l select=1:ncpus=1:mem=15GB
#PBS -l walltime=02:00:00

/glade/u/apps/opt/conda/envs/npl-2024a/bin/python -m distributed.cli.dask_worker tcp://128.117.208.112:42035 --nthreads 1 --memory-limit 14.90GiB --name dummy-name --nanny --death-timeout 60



In [3]:
# check that dask-workers are in queue or running (may take a couple minutes)
!qstat -u joko # change to your usename

                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
1595182.casper* joko     htc      STDIN       59457   1   1   16gb 12:00 R 00:39


In [4]:
# launch client dashboard to visualize dask diagnostics 
# click on "Launch dashboard in JupyterLab" 
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/joko/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/joko/proxy/8787/status,Workers: 1
Total threads: 1,Total memory: 14.90 GiB

0,1
Comm: tcp://128.117.208.112:42035,Workers: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/joko/proxy/8787/status,Total threads: 1
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://128.117.208.75:37993,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/joko/proxy/33997/status,Memory: 14.90 GiB
Nanny: tcp://128.117.208.75:43487,
Local directory: /glade/derecho/scratch/joko/tmp/dask-scratch-space/worker-y9wj258k,Local directory: /glade/derecho/scratch/joko/tmp/dask-scratch-space/worker-y9wj258k


# Xarray example
Load a collection of netcdf CM1 output files and do basic calculations

In [None]:
# Load bunch of netcdf files as single dataset
data_folder = '/glade/derecho/scratch/joko/cm1-iceball-nucleation-exp/200' # sample experiment: CM1 simulation with bulk microphysics
save_folder = '/glade/work/joko/nucleation-test/output' # where to save modified dataset (if you want)
Path(save_folder).mkdir(parents=True, exist_ok=True) # make folder if it doesn't exist
bulk_vars = ['qi2', 'qi4'] # subset of variables to look at 
file_path = f'{data_folder}/cm1out_0*.nc' # lets us select all nc files with cm1out- prefix in data_folder
ds = xr.open_mfdataset(file_path, combine='nested', concat_dim=['time'], preprocess=lambda ds: ds[bulk_vars]) # open collection netcdf files

If you look at the data variables in the ds, you will see that the data is chunked automatically by dask under the hood. You can see the visualization of the chunk by pressing the 'Show/Hide data repr' (three little stacked disks symbol) next to a data variable (e.g., qi2). These chunks are sent to the N workers you spawned earlier and they are processed in parallel when you execute some calculations.

In [None]:
ds

In [None]:
# define some calculations (e.g., calculate the mean, sum, etc.)
ds_mean = ds.mean(dim=['xh', 'yh']) # calculate mean vertical profile
ds_mean = ds_mean.assign(qi = ds_mean['qi2'] + ds_mean['qi4']) # sum/assign total mass mixing ratio

In [None]:
ds_mean

At this point, the calculations haven't actually been executed. You will need to explicitly call compute() for dask to execute the computations across the distibuted cluster.

In [None]:
# execute the calculations
ds_mean = ds_mean.compute()

In [None]:
# look at the new dataset (now you will see numeric values in the dataset)
ds_mean

In [None]:
# Example viz: create animation of vertical profile changing over the simulation time period

# initialize plot
fig, ax = plt.subplots(figsize=(3, 4))
line, = ax.plot(ds_mean.qi[0, :], ds_mean.zh)
x_max, y_max = 9e-5, 14
ax.set_xlabel('qi [kg/kg]', fontsize=12)
ax.set_ylabel('z [km]', fontsize=12)
ax.set_ylim(0, y_max)
ax.set_xlim(0, x_max)
fig.set_tight_layout(True)
time_array = ds_mean.time.values

# create function that will be called by animation object
frames = 37
def animate(i):
    line.set_xdata(ds_mean.qi[i, :])
    timestamp = time_array[i]
    timestamp_seconds = timestamp / np.timedelta64(1, 's')
    timestamp_str = str(datetime.timedelta(seconds = timestamp_seconds))
    title = f'qi, mass mixing ratio: {timestamp_str} [H:MM:SS]'
    ax.set_title(title, fontsize=8)
    return line,
    
# create animation object & save
ani = animation.FuncAnimation(fig, animate, frames, interval=150)

# # if you want to save the figure
# save_folder = '/glade/u/home/joko/cm1/analysis/bulk_sdm_comparison/output/figures' # folder to save
# save_filename = 'bulk_old_irho6_qi_z_profile_ts.mp4' # name of file to save
# save_path = os.path.join(save_folder, save_filename)
# ani.save(save_path)

# display w/ Javascript
HTML(ani.to_jshtml())

# Pandas example
Load a collection of CM1 SDM trajectory files and do basic calculations

In [None]:
# Read 10 files from single timestep (05400) 
data_folder = '/glade/derecho/scratch/klamb/superdroplets/outsdm_iceball_nowind_rhod_dist_min200_time_var_sgs_1024_poly_trj/SDM_trajs'
file_path = f'{data_folder}/SD_output_ASCII_05400*' # files with 05400 timestamp prefix
list_paths = glob.glob(file_path)
list_paths.sort()
n_files = 10 # just use first 10 files
colnames =['x[m]', 'y[m]', 'z[m]', 'vz[m]','radius(droplet)[m]',
           'mass_of_aerosol_in_droplet/ice(1:01)[g]', 'radius_eq(ice)[m]',
           'radius_pol(ice)[m]', 'density(droplet/ice)[kg/m3]', 'rhod [kg/m3]',
           'multiplicity[-]','status[-]','index','rime_mass[kg]',
           'num_of_monomers[-]','rk_deact']
# df = dd.read_csv(list_paths[:n_files])
df = dd.read_csv(list_paths[:n_files], sep = '\s+', skiprows=1, 
                 header=None, delim_whitespace=False, 
                 names=colnames).set_index('rk_deact')
df

Dask has paritioned the dataset into 10 parts - one for each file.

In [None]:
df.visualize()

first basic computation: len() will trigger a load + computation

In [None]:
# load and count number of rows 
len(df)

Do a more non-trivial computation

In [None]:
df.columns

In [None]:
# find max deposition density in df
rhod_max = df['rhod [kg/m3]'].mean()
rhod_max.compute()

In [None]:
# calculate mass [kg] of each particle 
vol = (4/3) * np.pi * (df['radius_eq(ice)[m]'])**3
mass = vol * df['density(droplet/ice)[kg/m3]']
mass = mass.compute()
mass[:10] # print first values

# Simple speed test: dask vs. regular pandas

In this example, we compare dask vs. regular (serial) pandas for a simple mean calculation on a column. Here we show that dask may not always be beneficial to use. Generally, if the data is small and fits easily on memory, regular pandas may actually be faster! We demonstrate this here by doing the same calculation, except in Case (1) we use 10 files and in Case (2) we use 576 files. 

## Case (1): 10 files

In [12]:
# Get list of files from single timestep (05400) --> should be 576 files in total
data_folder = '/glade/derecho/scratch/klamb/superdroplets/outsdm_iceball_nowind_rhod_dist_min200_time_var_sgs_1024_poly_trj/SDM_trajs'
file_path = f'{data_folder}/SD_output_ASCII_05400*' # files with 05400 timestamp prefix
list_paths = glob.glob(file_path)
list_paths.sort()
print(len(list_paths))
n_files = 10
colnames =['x[m]', 'y[m]', 'z[m]', 'vz[m]','radius(droplet)[m]',
           'mass_of_aerosol_in_droplet/ice(1:01)[g]', 'radius_eq(ice)[m]',
           'radius_pol(ice)[m]', 'density(droplet/ice)[kg/m3]', 'rhod [kg/m3]',
           'multiplicity[-]','status[-]','index','rime_mass[kg]',
           'num_of_monomers[-]','rk_deact']

576


(a) Using Dask

In [13]:
%%time

df = dd.read_csv(list_paths[:n_files], sep = '\s+', skiprows=1, 
                 header=None, delim_whitespace=False, 
                 names=colnames).set_index('rk_deact')
# find max deposition density in df
rhod_max = df['rhod [kg/m3]'].mean()
rhod_max = rhod_max.compute()
print(f'rhod_max = {rhod_max}')

rhod_max = 326.9066903097588
CPU times: user 259 ms, sys: 24.7 ms, total: 283 ms
Wall time: 724 ms


(b) Serially using pandas

In [14]:
%%time
data = [] # list of df's
for f in list_paths[:n_files]:
    df = pd.read_csv(f, sep = '\s+', skiprows=1, 
                 header=None, delim_whitespace=False, 
                 names=colnames).set_index('rk_deact')
    data.append(df)
df = pd.concat(data)
# find max deposition density in df
rhod_max = df['rhod [kg/m3]'].mean()
print(f'rhod_max = {rhod_max}')

rhod_max = 326.9066903097588
CPU times: user 131 ms, sys: 9.02 ms, total: 140 ms
Wall time: 211 ms


## Case (2): 576 files

In [15]:
# Get list of files from single timestep (05400) --> should be 576 files in total
data_folder = '/glade/derecho/scratch/klamb/superdroplets/outsdm_iceball_nowind_rhod_dist_min200_time_var_sgs_1024_poly_trj/SDM_trajs'
file_path = f'{data_folder}/SD_output_ASCII_05400*' # files with 05400 timestamp prefix
list_paths = glob.glob(file_path)
list_paths.sort()
print(len(list_paths))
n_files = len(list_paths)
colnames =['x[m]', 'y[m]', 'z[m]', 'vz[m]','radius(droplet)[m]',
           'mass_of_aerosol_in_droplet/ice(1:01)[g]', 'radius_eq(ice)[m]',
           'radius_pol(ice)[m]', 'density(droplet/ice)[kg/m3]', 'rhod [kg/m3]',
           'multiplicity[-]','status[-]','index','rime_mass[kg]',
           'num_of_monomers[-]','rk_deact']

576


(a) Using Dask

In [17]:
%%time
n_files = len(list_paths)
colnames =['x[m]', 'y[m]', 'z[m]', 'vz[m]','radius(droplet)[m]',
           'mass_of_aerosol_in_droplet/ice(1:01)[g]', 'radius_eq(ice)[m]',
           'radius_pol(ice)[m]', 'density(droplet/ice)[kg/m3]', 'rhod [kg/m3]',
           'multiplicity[-]','status[-]','index','rime_mass[kg]',
           'num_of_monomers[-]','rk_deact']
df = dd.read_csv(list_paths[:n_files], sep = '\s+', skiprows=1, 
                 header=None, delim_whitespace=False, 
                 names=colnames).set_index('rk_deact')
# find max deposition density in df
rhod_max = df['rhod [kg/m3]'].mean()
rhod_max = rhod_max.compute()
print(f'rhod_max = {rhod_max}')

rhod_max = 324.2411671882189
CPU times: user 6.61 s, sys: 363 ms, total: 6.97 s
Wall time: 12.1 s


(b) Serial using Pandas

In [18]:
%%time
data = [] # list of df's
for f in list_paths[:n_files]:
    df = pd.read_csv(f, sep = '\s+', skiprows=1, 
                 header=None, delim_whitespace=False, 
                 names=colnames).set_index('rk_deact')
    data.append(df)
df = pd.concat(data)
# find max deposition density in df
rhod_max = df['rhod [kg/m3]'].mean()
print(f'rhod_max = {rhod_max}')

rhod_max = 324.2411671882188
CPU times: user 7.39 s, sys: 4.72 s, total: 12.1 s
Wall time: 38.9 s


Summary: (values may vary by machine and # dask workers)
| | Dask Pandas | Serial Pandas |
| --- | --- | --- |
| n_files=10 | 724 ms | 211 ms |
| n_files=576 | 12.1 s | 38.9 s |

The takeaway: Use Dask with caution! You will generally only see the scaling advantage of distributed computing as the data gets larger. I.e. if the data is small and fits on memory, it's probably best to use regular pandas or xarray.