# Distribute computation of RDataFrames in a SLURM cluster

In this part, we perform a more realistic and useful example. \
Given a ```TTree``` with 150 branches stored into 500 ROOT files, we want to obtain the histograms from all the branches spreading the computation over the nodes of a SLURM cluster. \
Workflow:
* setup a Dask scheduler with 10 workers, each one using 4 cores and 10 GB of RAM
* submit to the scheduler the function ```get_results``` once for each ROOT file
* by using ```dask.distributed.as_completed``` and ```ROOT.THisto.Add``` we merge the results obtained from each ROOT file

We will also follow the execution of the tasks we submitted on the Dask Dashboard.

N.B.: the same result could have been achieved by feeding a single ```RDataFrame``` with a ```TChain``` containing all the 500 files.  

In [None]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, as_completed
import ROOT

In [None]:
def get_results(tree_name, root_file):
    import ROOT
    ROOT.EnableImplicitMT()
    rdf = ROOT.RDataFrame(tree_name, root_file)
    names = [name for name in rdf.GetColumnNames()]
    ptrs = [rdf.Histo1D(name) for name in names]
    results = [ptr.GetValue() for ptr in ptrs]
    return results

In [None]:
# Just some variables to be able to conveniently loop over the files names

base_dir = '/work/gallim/root_files/tnp_original/19102020_data_UL17/'
file_tmplt = 'output_SingleElectron_alesauva-UL2017-10_6_4-v0-Run2017{}-09Aug2019_UL2017{}_USER_{}.root'
tree_name = 'tagAndProbeDumper/trees/Data_13TeV_All'
runs_id = runs_id = [
    ('B', '-v1-8940b7b9416f1cbf6fbb86981f4883ea'), 
    ('C', '-v1-c086301171e46d9c80ca640d553ab2cd'), 
    ('D', '-v1-c086301171e46d9c80ca640d553ab2cd'), 
    ('E', '-v1-c086301171e46d9c80ca640d553ab2cd'), 
    ('F', '_rsb-v2-c086301171e46d9c80ca640d553ab2cd')
    ]
n_files = 50 # all would be 500

In [None]:
cluster = SLURMCluster(cores=4, memory='10G', log_directory='slurm_logs')
client = Client(cluster)
cluster.scale(10)

In [None]:
client

In [None]:
%%time

import os.path

futures = {}
for ri in runs_id:
    for n in range(n_files):
        file_name = file_tmplt.format(ri[0], ri[1], n)
        if os.path.exists(base_dir + file_name):
            futures[file_name] = client.submit(get_results, tree_name, base_dir + file_name)

final_histos = []
for future_histos, histos in as_completed(futures.values(), with_results=True):
    if not final_histos:
        for histo in histos:
            final_histos.append(histo)
    else:
        for final_histo, histo in zip(final_histos, histos):
            final_histo.Add(histo)
    del future_histos

In [None]:
client.close()
cluster.close()
del client
del cluster

### If we want to see the histograms (ROOT Jupyter trick)

In [None]:
canvases = []

for histo in final_histos:
    canvas = ROOT.TCanvas(histo.GetName(), histo.GetName())
    histo.Draw()
    canvases.append(canvas)

for canvas in canvases:
    canvas.Draw()