In [1]:
from dask_jobqueue import SLURMCluster
from distributed import Client, LocalCluster
import uproot
from joblib import parallel_backend, Parallel, delayed

In [2]:
cluster = LocalCluster(n_workers=2, threads_per_worker=1)

In [3]:
cluster.scale(2)

In [4]:
client = Client(cluster)

In [7]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:38501  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 134.92 GB


In [9]:
# Define function which is fed to each joblib/Dask worker

def read_branches(file_name, tree_name, branches_query):
    # This does NOT read the branches (yet)
    file = uproot.open(file_name)
    tree = file[tree_name]
    # This reads the branches
    return tree.arrays(filter_name=branches_query)

In [49]:
%%timeit
# Code incapsulated in parallel_backend is scheduled and waited for
# As can be seen in the dashboard, this is correctly spread over the cluster

file_name = 'flashgg_investigate/test_nanoaod_99_Skim.root'
tree_name = 'Events'
queries = ['Electron*', 'Photon*']
    
with parallel_backend('dask', scheduler_host=cluster):
    electrons, photons = Parallel()(delayed(read_branches)(file_name, tree_name, q) for q in queries)

1.68 s ± 106 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [50]:
%%timeit

file = uproot.open(file_name)
tree = file[tree_name]
branches = tree.arrays(filter_name=queries)

2.19 s ± 399 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
