# ROOT training: advanced RDataFrame topics

## Structuring RDF code

In [1]:
import ROOT

# Group RDF transformations together in functions
def apply_filters(df):
    return df.Filter("b < 0.5").Filter("a > 0")

df = ROOT.RDataFrame("dataset", "data/example_file.root")
df = apply_filters(df)

Welcome to JupyROOT 6.27/01


In [2]:
%%cpp

// Implement small C++ helper functions to encapsulate more complex computations
ROOT::RVecD select_pt(const ROOT::RVecD &muon_pt, const ROOT::RVecD &muon_eta) {
    return muon_pt[abs(muon_eta) < 2.5];
}

In [3]:
# We can then call the C++ helper functions from Define, Filter, etc. as usual:
df.Define("v3", "select_pt(vec1, vec2)");

## Running multiple concurrent event loops with `RunGraphs`

In [4]:
ROOT.EnableImplicitMT()

# Imagine these are 2 very different input datasets, with different transformations applied to them
df = ROOT.RDataFrame("dataset", "data/example_file.root")
h1 = df.Histo1D("a")

df2 = ROOT.RDataFrame("dataset", "data/example_file.root")
h2 = df2.Histo1D("b")

# Just accessing results, the event loops of separate RDataFrames run one after the other
# h1.Draw() # first event loop would run here
# h2.Draw() # second event loop would runs here

# RunGraphs instead triggers all event loops required to produce the results passed as argument
ROOT.RDF.RunGraphs([h1, h2])


## Expressing systematic variations

Use `Vary` to attach varied values to one or more existing columns.

Varied columns can be used in `Define`s, `Filter`s, as histogram value/weights and anything else.<br>
Variations automatically propagate to selections, derived quantities and results.

Finally you can recover varied versions of the results with `VariationsFor(nominal_result)`.

In [5]:
# Attach a "down" and "up" variation to column `a`
df.Vary("a", "ROOT::RVecD{a*0.9, a*1.1}", variationTags=["down", "up"]);

## Vary: usage example

In [6]:
nominal_h = df.Vary("a", "ROOT::RVecD{a*0.9, a*1.1}", variationTags=["down", "up"])\
              .Filter("b < 0.5 && a > 0.1")\
              .Define("loga", "log(a)")\
              .Histo1D("loga")
all_hs = ROOT.RDF.Experimental.VariationsFor(nominal_h)
print("Keys: ", all_hs.GetKeys())

Keys:  { "nominal", "a:down", "a:up" }


In [7]:
%jsroot on
c = ROOT.TCanvas()
for k, color in zip(all_hs.GetKeys(), [ROOT.kBlue, ROOT.kRed, ROOT.kGreen]):
    all_hs[k].Draw("HIST SAME")
    all_hs[k].SetLineColor(color)
c.Draw()

## Distributed execution of RDataFrame applications

Rather than executing RDataFrame event loops on local cores, you can request that computations are scheduled using Dask or Spark. For example:

In [8]:
import ROOT
from dask.distributed import LocalCluster, Client
 
# Point RDataFrame calls to the Dask-based execution engine
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
# Create a local dask cluster (this could also be an HTCondorCluster, SLURMCluster, SSHCluster, ...)
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
# Construct the RDataFrame passing the special `daskclient` argument
dist_df = RDataFrame("dataset", "data/example_file.root", daskclient=Client(cluster))
 
# The rest of the interface stays the same
h = dist_df.Filter("a > 10")\
           .Histo1D(("name", "title", 10, 0, 10), "a")
 
print(h.GetEntries())

491.0


2022-09-12 08:54:28,509 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-b0t691pb', purging
2022-09-12 08:54:28,509 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-6a2zfduq', purging
2022-09-12 08:54:28,509 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-_losz42k', purging
2022-09-12 08:54:28,509 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-t7idgf0z', purging
