# CMS Open Data $t\bar{t}$ with ROOT RDataFrame

This notebook demonstrates a physics analysis of the $t\bar{t}$ cross-section measurement with [2015 CMS Open Data](https://cms.cern/news/first-cms-open-data-lhc-run-2-released).

It is developed using ROOT RDataFrame as the analysis interface. It implements the equivalent version of the [v1.0.0](https://agc.readthedocs.io/en/latest/versionsdescription.html) of the reference implementation, including showing how to specify the input dataset, the various steps involved in processing data and the output histograms. 

# Environment setup

In [None]:
import os
from pathlib import Path
from time import time
from typing import Optional

import ROOT
from distributed import Client, LocalCluster, SSHCluster, get_worker
from plotting import save_plots
from utils import (
    AGCInput,
    AGCResult,
    postprocess_results,
    retrieve_inputs,
    save_histos,
)

## Analysis configuration

The next cell contains the parameters that can be tuned to configure various aspects of the notebook execution. Changing the default value of any variable in the cell modifies the corresponding option. The options are described in the following table:

| Setting                | Meaning                                                                                                                                                                                                                                                                                                 |
|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| DATA_CACHE             | Use the specified directory as a local data cache: required input datasets will be downloaded here and the analysis will read this local copy of the data.                                                                                                                                              |
| HOSTS                  | A comma-separated list of worker node hostnames. Only required if --scheduler=dask-ssh, ignored otherwise.                                                                                                                                                                                              |
| NCORES                 | Number of cores to use. In case of distributed execution this is the amount of cores per node.                                                                                                                                                                                                          |
| N_MAX_FILES_PER_SAMPLE | How many files per sample will be processed. If left unchanged, all files for all samples                                                                                                                                                                                                                       |
| NPARTITIONS            | Number of data partitions. Only used in case of distributed execution. By default it follows RDataFrame defaults.                                                                                                                                                                                       |
| OUTPUT                 | Name of the file where analysis results will be stored. If it already exists, contents are overwritten.                                                                                                                                                                                                 |
| REMOTE_DATA_PREFIX     | The original data is stored at 'https://xrootd-local.unl.edu:1094//store/user/AGC'. Changing the prefix is replaced with the argument to this option when accessing remote data. For example for the version of the input datasets stored on EOS use `'root://eoscms.cern.ch//eos/cms/store/test/agc'`. |
| SCHEDULER              | The scheduler for RDataFrame parallelization. Will honor the --ncores flag. The default is `mt`, i.e. multi-thread execution. If `dask-ssh`, a list of worker node hostnames to connect to should be provided via the --nodes option.                                                                   |
| VERBOSE                | Turn on verbose execution logs.                                                                                                                                                                                                                                                                         |

In [None]:
DATA_CACHE: Optional[str] = None

HOSTS: Optional[str] = None

NCORES: int = len(os.sched_getaffinity(0))

N_MAX_FILES_PER_SAMPLE: Optional[int] = None

NPARTITIONS: Optional[int] = None

OUTPUT: str = "histograms.root"

REMOTE_DATA_PREFIX: Optional[str] = None

SCHEDULER: str = "mt"

VERBOSE: bool = False

## Configuration of cluster connection via Dask

This notebook can be executed both on the local machine but also seamlessly distributed over multiple nodes via Dask. The next utility function creates a Dask `Client` object based on the `SCHEDULER` option chosen.

In [None]:
def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client:
    """Create a Dask distributed client."""
    if scheduler == "dask-local":
        lc = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True)
        return Client(lc)

    if scheduler == "dask-ssh":
        workers = hosts.split(",")
        print(f"Using worker nodes: {workers=}")
        # The creation of the SSHCluster object might need to be further configured to fit specific use cases.
        # For example, in some clusters the "local_directory" key must be supplied in the worker_options dictionary.
        sshc = SSHCluster(
            workers,
            connect_options={"known_hosts": None},
            worker_options={"nprocs": ncores, "nthreads": 1, "memory_limit": "32GB"},
        )
        return Client(sshc)

    raise ValueError(
        f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh']."
    )

## Loading C++ helper functions

The notebook will use a shared library obtained by compiling a C++ source file with optimizations before starting the analysis. The next helper function does exactly that, using the correct path to the C++ file: either from the same directory of the notebook or from the path on the distributed worker node filesystem.

In [None]:
def load_cpp():
    """Load C++ helper functions. Works for both local and distributed execution."""
    try:
        # when using distributed RDataFrame 'helpers.cpp' is copied to the local_directory
        # of every worker (via `distribute_unique_paths`)
        localdir = get_worker().local_directory
        cpp_source = Path(localdir) / "helpers.cpp"
    except ValueError:
        # must be local execution
        cpp_source = "helpers.cpp"

    ROOT.gSystem.CompileMacro(str(cpp_source), "kO")

# Defining the analysis steps

In [None]:
# Using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data
# as a reference. Values are in pb.
XSEC_INFO = {
    "ttbar": 396.87 + 332.97,  # nonallhad + allhad, keep same x-sec for all
    "single_top_s_chan": 2.0268 + 1.2676,
    "single_top_t_chan": (36.993 + 22.175) / 0.252,  # scale from lepton filter to inclusive
    "single_top_tW": 37.936 + 37.906,
    "wjets": 61457 * 0.252,  # e/mu+nu final states
}

In [None]:
def make_rdf(
    files: list[str], client: Optional[Client], npartitions: Optional[int]
) -> ROOT.RDataFrame:
    """Construct and return a dataframe or, if a dask client is present, a distributed dataframe."""
    if client is not None:
        d = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(
            "Events", files, daskclient=client, npartitions=npartitions
        )
        # Make sure the C++ helpers are uploaded to the distributed workers
        d._headnode.backend.distribute_unique_paths(
            [
                "helpers.cpp",
            ]
        )
        return d

    return ROOT.RDataFrame("Events", files)

In [None]:
def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
    """Add the trijet_mass observable to the dataframe after applying the appropriate selections."""

    # First, select events with at least 2 b-tagged jets
    df = df.Filter("Sum(Jet_btagCSVV2_ptcut > 0.5) > 1")

    # Build four-momentum vectors for each jet
    df = (
        df.Define(
            "Jet_p4",
            """
        ROOT::VecOps::Construct<ROOT::Math::PxPyPzMVector>(
            ROOT::VecOps::Construct<ROOT::Math::PtEtaPhiMVector>(
                Jet_pt_ptcut, Jet_eta[Jet_pt_mask], Jet_phi[Jet_pt_mask], Jet_mass[Jet_pt_mask]
            )
        )
        """,
        )
    )

    # Build trijet combinations
    df = df.Define("Trijet_idx", "Combinations(Jet_pt_ptcut, 3)")

    # Trijet_btag is a helpful array mask indicating whether or not the maximum btag value in Trijet is larger than the 0.5 threshold
    df = df.Define(
        "Trijet_btag",
        """
            auto J1_btagCSVV2 = Take(Jet_btagCSVV2_ptcut, Trijet_idx[0]);
            auto J2_btagCSVV2 = Take(Jet_btagCSVV2_ptcut, Trijet_idx[1]);
            auto J3_btagCSVV2 = Take(Jet_btagCSVV2_ptcut, Trijet_idx[2]);
            return J1_btagCSVV2 > 0.5 || J2_btagCSVV2 > 0.5 || J3_btagCSVV2 > 0.5;
            """
    )

    # Assign four-momentums to each trijet combination
    df = df.Define(
        'Trijet_p4',
        '''
        auto J1 = Take(Jet_p4, Trijet_idx[0]);
        auto J2 = Take(Jet_p4, Trijet_idx[1]);
        auto J3 = Take(Jet_p4, Trijet_idx[2]);
        return (J1+J2+J3)[Trijet_btag];
        '''
    )

    # Get trijet transverse momentum values from four-momentum vectors
    df = df.Define(
        "Trijet_pt",
        "return Map(Trijet_p4, [](const ROOT::Math::PxPyPzMVector &v) { return v.Pt(); })",
    )

    # Evaluate mass of trijet with maximum pt and btag higher than threshold
    df = df.Define(
        "Trijet_mass", "Trijet_p4[ArgMax(Trijet_pt)].M()"
    )

    return df

In [None]:
def book_histos(
    df: ROOT.RDataFrame,
    process: str,
    variation: str,
    nevents: int,
) -> list[AGCResult]:
    """Return the RDataFrame results pertaining to the desired process and variation."""
    # Calculate normalization for MC
    x_sec = XSEC_INFO[process]
    lumi = 3378  # /pb
    xsec_weight = x_sec * lumi / nevents
    df = df.Define("Weights", str(xsec_weight))  # default weights

    if variation == "nominal":
        # Jet_pt variations definition
        # pt_scale_up() and pt_res_up(jet_pt) return scaling factors applying to jet_pt
        # pt_scale_up() - jet energy scaly systematic
        # pt_res_up(jet_pt) - jet resolution systematic
        df = df.Vary(
            "Jet_pt",
            "ROOT::RVec<ROOT::RVecF>{Jet_pt*pt_scale_up(), Jet_pt*jet_pt_resolution(Jet_pt.size())}",
            ["pt_scale_up", "pt_res_up"],
        )

        if process == "wjets":
            # Flat weight variation definition
            df = df.Vary(
                "Weights",
                "Weights*flat_variation()",
                [f"scale_var_{direction}" for direction in ["up", "down"]],
            )

    # Event selection - the core part of the algorithm applied for both regions
    # Selecting events containing at least one lepton and four jets with pT > 25 GeV
    # Applying requirement at least one of them must be b-tagged jet (see details in the specification)
    df = (
        df.Define("Electron_pt_mask", "Electron_pt>25")
        .Define("Muon_pt_mask", "Muon_pt>25")
        .Define("Jet_pt_mask", "Jet_pt>25")
        .Filter("Sum(Electron_pt_mask) + Sum(Muon_pt_mask) == 1")
        .Filter("Sum(Jet_pt_mask) >= 4")
    )

    # create columns for "good" jet pt and btag values as these columns are used several times
    df = (
        df.Define("Jet_pt_ptcut", "Jet_pt[Jet_pt_mask]")
          .Define("Jet_btagCSVV2_ptcut", "Jet_btagCSVV2[Jet_pt_mask]")
    )

    # b-tagging variations for nominal samples
    if variation == "nominal":
        df = df.Vary(
            "Weights",
            "ROOT::RVecD{Weights*btag_weight_variation(Jet_pt_ptcut)}",
            [
                f"{weight_name}_{direction}"
                for weight_name in [f"btag_var_{i}" for i in range(4)]
                for direction in ["up", "down"]
            ],
        )

    # Define HT observable for the 4j1b region
    # Only one b-tagged region required
    # The observable is the total transvesre momentum
    # fmt: off
    df4j1b = df.Filter("Sum(Jet_btagCSVV2_ptcut > 0.5) == 1")\
               .Define("HT", "Sum(Jet_pt_ptcut)")
    # fmt: on

    # Define trijet_mass observable for the 4j2b region (this one is more complicated)
    df4j2b = define_trijet_mass(df)

    # Select the right VariationsFor function depending on RDF or DistRDF
    if type(df).__module__ == "DistRDF.Proxy":
        variationsfor_func = ROOT.RDF.Experimental.Distributed.VariationsFor
    else:
        variationsfor_func = ROOT.RDF.Experimental.VariationsFor

    # Book histograms and, if needed, their systematic variations
    results = []
    for df, observable, region in zip([df4j1b, df4j2b], ["HT", "Trijet_mass"], ["4j1b", "4j2b"]):
        histo_model = ROOT.RDF.TH1DModel(
            name=f"{region}_{process}_{variation}", title=process, nbinsx=25, xlow=50, xup=550
        )
        nominal_histo = df.Histo1D(histo_model, observable, "Weights")

        if variation == "nominal":
            varied_histos = variationsfor_func(nominal_histo)
            results.append(AGCResult(varied_histos, region, process, variation, nominal_histo))
        else:
            results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo))
        print(f"Booked histogram {histo_model.fName}")

    # Return the booked results
    # Note that no event loop has run yet at this point (RDataFrame is lazy)
    return results

# Execute the analysis and store the output histograms

In [None]:
program_start = time()

# Do not add histograms to TDirectories automatically: we'll do it ourselves as needed.
ROOT.TH1.AddDirectory(False)
# Disable interactive graphics: avoids canvases flashing on screen before we save them to file
ROOT.gROOT.SetBatch(True)

if VERBOSE:
    # Set higher RDF verbosity for the rest of the program.
    # To only change the verbosity in a given scope, use ROOT.Experimental.RLogScopedVerbosity.
    ROOT.Detail.RDF.RDFLogChannel.SetVerbosity(ROOT.Experimental.ELogLevel.kInfo)

if SCHEDULER == "mt":
    # Setup for local, multi-thread RDataFrame
    ROOT.EnableImplicitMT(NCORES)
    print(f"Number of threads: {ROOT.GetThreadPoolSize()}")
    client = None
    load_cpp()
    run_graphs = ROOT.RDF.RunGraphs
else:
    # Setup for distributed RDataFrame
    client = create_dask_client(SCHEDULER, NCORES, HOSTS)
    ROOT.RDF.Experimental.Distributed.initialize(load_cpp)
    run_graphs = ROOT.RDF.Experimental.Distributed.RunGraphs

# Book RDataFrame results
inputs: list[AGCInput] = retrieve_inputs(
    N_MAX_FILES_PER_SAMPLE, REMOTE_DATA_PREFIX, DATA_CACHE
)
results: list[AGCResult] = []
for input in inputs:
    df = make_rdf(input.paths, client, NPARTITIONS)
    results += book_histos(df, input.process, input.variation, input.nevents)
print(f"Building the computation graphs took {time() - program_start:.2f} seconds")

# Run the event loops for all processes and variations here
run_graphs_start = time()
run_graphs([r.nominal_histo for r in results])
print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")
if client is not None:
    client.close()

results = postprocess_results(results)
save_histos([r.histo for r in results], output_fname=OUTPUT)
print(f"Result histograms saved in file {OUTPUT}")

# Visualize the histograms

## Create the plotting canvas

In [None]:
width = 2160
height = 2160
c = ROOT.TCanvas("c", "c", width, height)
ROOT.gStyle.SetPalette(ROOT.kRainBow)

## Region 1 stack

In [None]:
hlist = [r.histo for r in results if r.region == "4j1b" and r.variation == "nominal"]
hlist = [h.Clone().Rebin(2) for h in hlist]
hs = ROOT.THStack("j4b1", ">=4 jets, 1 b-tag; H_{T} [GeV]")
for h in hlist:
    hs.Add(h)
hs.Draw("hist pfc plc")
c.Draw()
x = hs.GetXaxis()
x.SetRangeUser(120, x.GetXmax())
x.SetTitleOffset(1.5)
x.CenterTitle()
c.BuildLegend(0.65, 0.7, 0.9, 0.9)
c.SaveAs("reg1.png")
c.Draw()

## Region 2 stack

In [None]:
hlist = [r.histo for r in results if r.region == "4j2b" and r.variation == "nominal"]
hs = ROOT.THStack("j4b1", ">=4 jets, 2 b-tag; H_{T} [GeV]")
for h in hlist:
    hs.Add(h)
hs.Draw("hist pfc plc")
c.Draw()
x = hs.GetXaxis()
x.SetTitleOffset(1.5)
x.CenterTitle()
c.BuildLegend(0.65, 0.7, 0.9, 0.9)
c.SaveAs("reg2.png")
c.Draw()

## b-tag variations

In [None]:
btag_variations = [
    "nominal",
    "btag_var_0_up",
    "btag_var_1_up",
    "btag_var_2_up",
    "btag_var_3_up",
]


def btag_filter(r):
    return r.region == "4j1b" and r.process == "ttbar" and r.variation in btag_variations


hlist = [r.histo for r in results if btag_filter(r)]
hlist = [h.Clone().Rebin(2) for h in hlist]
hs = ROOT.THStack("j4b1btag", "btag-variations ; H_{T} [GeV]")
for h, name in zip(hlist, btag_variations):
    h.SetLineWidth(4)
    h.SetTitle(name)
    hs.Add(h)
hs.Draw("hist nostack plc")
c.Draw()
x = hs.GetXaxis()
x.SetRangeUser(120, x.GetXmax())
x.SetTitleOffset(1.5)
x.CenterTitle()
c.BuildLegend(0.65, 0.7, 0.9, 0.9)
c.SaveAs("btag.png")
c.Draw()

## Jet energy variations

In [None]:
jet_variations = ["nominal", "pt_scale_up", "pt_res_up"]


def jet_filter(r):
    return r.region == "4j2b" and r.process == "ttbar" and r.variation in jet_variations


hlist = [r.histo for r in results if jet_filter(r)]
hs = ROOT.THStack("4j2bjet", "Jet energy variations ; m_{bjj} [GeV]")
for h, name in zip(hlist, jet_variations):
    h.SetFillColor(0)
    h.SetLineWidth(4)
    h.SetTitle(name)
    hs.Add(h)
hs.Draw("hist nostack plc")
c.Draw()
x = hs.GetXaxis()
x.SetRangeUser(0, 600)
x.SetTitleOffset(1.5)
x.CenterTitle()
c.BuildLegend(0.65, 0.7, 0.9, 0.9)
c.SaveAs("jet.png")
c.Draw()