# Dask in HEP analyses

### Warning: In this talk we will assume some familiarity with the uproot+awkward way of dealing with data.

Dask provides an interface for specifying/locating input data and then describing manipulations on that data are organized into a task graph. This task graph can then be executed on local compute or on a cluster. Dask Array and Dask Dataframe deal well with rectangular data. They provide a scalable interface to describe manipulations of data that may not fit into
system memory by mapping transformations onto partitions of the data that fit in memory.

<img src="https://docs.dask.org/en/stable/_images/dask-overview.svg" width="400" style="margin-right: 20px;"><img src="https://docs.dask.org/en/latest/_images/dask-array.svg" width="400">

But in physics we're dealing with [jagged or ragged arrays](https://en.wikipedia.org/wiki/Jagged_array). A ragged array is something like this:

```
[[1, 2, 3],
 [4],
 [],
 [5, 6]]
---------------------
type: 4 * var * int64
```

In the pythonic HEP ecosystem, we deal with those kinds of arrays using [awkward](https://github.com/scikit-hep/awkward). Awkward Arrays are general tree-like data structures, like JSON, but contiguous in memory and operated upon with compiled, vectorized code like NumPy. For more information, please visit the [awkward array docs](https://awkward-array.org/doc/main/index.html) and/or see [previous talks from Jim Pivarski](https://github.com/jpivarski-talks/).

### How jagged arrays and histogramming are integrated with dask: awkward array 2.0, dask_awkward, dask_histogram, and coffea

<img src="img/coffea-upgrade.png" width="600">

Awkward array 2.0 features an improved and streamlined backend with. The backend is using only C and python without any C++ metadata handling. `ak.virtual` delayed computations are replaced by [dask-awkward](https://github.com/dask-contrib/dask-awkward).

`dask_awkward` and `dask_histogram` bring delayed, distributed computation to `awkward array 2.0` based analyses and libraries. [Uproot](https://github.com/scikit-hep/uproot5) now provides lazy reading of data via [uproot.dask](https://uproot.readthedocs.io/en/latest/uproot._dask.dask.html).

#### Our partitions are split on the event axis since each event is independent and we never run computations that combine more than 1 events.

All that provides access to dask at all layers of analysis which yields improved parallelism and better factorization away from compute infrastructure.
Coffea, and `NanoEvents` in particular which was almost entirely based on `ak.virtual`, now leverage all this infrastructure.

Please see [Lindsey's CHEP 2023 talk](https://indico.jlab.org/event/459/contributions/11533/attachments/9496/13762/CoffeaCHEP_LindseyGray_09052023.pdf) for more info on those upgrades and also see the most recent talks from [Lindsey](https://indico.cern.ch/event/1383972/contributions/5825304/) and [Jim](https://github.com/jpivarski-talks/2023-07-24-tac-hep-tutorial/blob/main/horizontal.ipynb).


In [None]:
import dask
from dask.threaded import get
from dask.local import get_sync
from dask.optimization import cull, inline, inline_functions, fuse


def print_and_return(string):
    print(string)
    return string


def format_str(count, val, nwords):
    return f"word list has {count} occurrences of " f"{val}, out of {nwords} words"


dsk = {
    "words": "apple orange apple pear orange pear pear",
    "nwords": (len, (str.split, "words")),
    "val1": "orange",
    "val2": "apple",
    "val3": "pear",
    "count1": (str.count, "words", "val1"),
    "count2": (str.count, "words", "val2"),
    "count3": (str.count, "words", "val3"),
    "format1": (format_str, "count1", "val1", "nwords"),
    "format2": (format_str, "count2", "val2", "nwords"),
    "format3": (format_str, "count3", "val3", "nwords"),
    "print1": (print_and_return, "format1"),
    "print2": (print_and_return, "format2"),
    "print3": (print_and_return, "format3"),
}

In [None]:
dask.base.visualize_dsk(dsk, verbose=True)

In [None]:
outputs = ["print1", "print2"]
dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

results = get_sync(dsk1, outputs)

dask.base.visualize_dsk(dsk1, verbose=True)

In [None]:
dsk2 = inline(dsk1, dependencies=dependencies)
results = get_sync(dsk2, outputs)

dask.base.visualize_dsk(dsk2, verbose=True)

In [None]:
dsk3 = inline_functions(dsk2, outputs, [len, str.split], dependencies=dependencies)
results = get_sync(dsk3, outputs)

dask.base.visualize_dsk(dsk3, verbose=True)

In [None]:
dsk4, dependencies = fuse(dsk3)
results = get_sync(dsk4, outputs)

dask.base.visualize_dsk(dsk4, verbose=True)

In [None]:
def optimize(dsk, keys):
    dsk1, deps = cull(dsk, keys)
    dsk2 = inline(dsk1, dependencies=deps)
    dsk3 = inline_functions(dsk2, keys, [len, str.split], dependencies=deps)
    dsk4, deps = fuse(dsk3)
    return dsk4, deps


def optimize_and_get(dsk, keys):
    dsk4, deps = fuse(dsk, keys)
    return get(dsk4, keys)


optimize_and_get(dsk, outputs)

In [None]:
dask.base.visualize_dsk(dsk, verbose=True, color="order")

In [None]:
dsk5, _ = optimize(dsk, outputs)

dask.base.visualize_dsk(dsk5, verbose=True, color="order")

In [None]:
import time

import awkward as ak
import hist
import matplotlib.pyplot as plt
import numpy as np
import numba
from coffea.dataset_tools import apply_to_fileset
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, BaseSchema
from coffea.nanoevents.methods import candidate
from coffea import processor

import dask
import dask_awkward as dak
import hist.dask as hda

# The opendata files are non-standard NanoAOD, so some optional data columns are missing
NanoAODSchema.warn_missing_crossrefs = False

# The warning emitted below indicates steps_per_file is for initial data exploration
# and test. When running at scale there are better ways to specify processing chunks
# of files.
events, report = NanoEventsFactory.from_root(
    {
        "https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root": "Events"
    },
    metadata={"dataset": "Test"},
    uproot_options={"allow_read_errors_with_report": True},
).events()

# Query 1

Plot the <i>E</i><sub>T</sub><sup>miss</sup> of all events.

In [None]:
q1_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events.MET.pt)
)

dask.compute(q1_hist, report)[0].plot1d(flow="none", yerr=False)
dak.necessary_columns(q1_hist)

# Query 2
Plot the <i>p</i><sub>T</sub> of all jets.

In [None]:
q2_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="ptj", label="Jet $p_{T}$ [GeV]")
    .Double()
    .fill(ak.flatten(events.Jet.pt))
)


q2_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q2_hist)

# Query 3
Plot the <i>p</i><sub>T</sub> of jets with |<i>η</i>| < 1.

In [None]:
q3_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="ptj", label="Jet $p_{T}$ [GeV]")
    .Double()
    .fill(ak.flatten(events.Jet[abs(events.Jet.eta) < 1].pt))
)

q3_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q3_hist)

# Query 4
Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have at least two jets with <i>p</i><sub>T</sub> > 40 GeV.

In [None]:
has2jets = ak.sum(events.Jet.pt > 40, axis=1) >= 2
q4_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events[has2jets].MET.pt)
)

q4_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q4_hist)

# Query 5
Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have an
opposite-charge muon pair with an invariant mass between 60 and 120 GeV.

In [None]:
mupair = ak.combinations(events.Muon, 2, fields=["mu1", "mu2"])
pairmass = (mupair.mu1 + mupair.mu2).mass
goodevent = ak.any(
    (pairmass > 60) & (pairmass < 120) & (mupair.mu1.charge == -mupair.mu2.charge),
    axis=1,
)
q5_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events[goodevent].MET.pt)
)


q5_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q5_hist)

# Query 6
For events with at least three jets, plot the <i>p</i><sub>T</sub> of the trijet four-momentum that has the invariant mass closest to 172.5 GeV in each event and plot the maximum <i>b</i>-tagging discriminant value among the jets in this trijet.

In [None]:
jets = ak.zip(
    {k: getattr(events.Jet, k) for k in ["x", "y", "z", "t", "btagDeepB"]},
    with_name="LorentzVector",
    behavior=events.Jet.behavior,
)
trijet = ak.combinations(jets, 3, fields=["j1", "j2", "j3"])
trijet["p4"] = trijet.j1 + trijet.j2 + trijet.j3
trijet = ak.flatten(
    trijet[ak.singletons(ak.argmin(abs(trijet.p4.mass - 172.5), axis=1))]
)
maxBtag = np.maximum(
    trijet.j1.btagDeepB,
    np.maximum(
        trijet.j2.btagDeepB,
        trijet.j3.btagDeepB,
    ),
)
q6_hists = {
    "trijetpt": hda.Hist.new.Reg(100, 0, 200, name="pt3j", label="Trijet $p_{T}$ [GeV]")
    .Double()
    .fill(trijet.p4.pt),
    "maxbtag": hda.Hist.new.Reg(
        100, 0, 1, name="btagDeepB", label="Max jet b-tag score"
    )
    .Double()
    .fill(maxBtag),
}

out = dask.compute(q6_hists, report)[0]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
out["trijetpt"].plot1d(ax=ax1, flow="none", yerr=False)
out["maxbtag"].plot1d(ax=ax2, flow="none", yerr=False)
dak.necessary_columns(q6_hists)

# Query 7
Plot the scalar sum in each event of the <i>p</i><sub>T</sub> of jets with <i>p</i><sub>T</sub> > 30 GeV that are not within 0.4 in Δ<i>R</i> of any light lepton with <i>p</i><sub>T</sub> > 10 GeV.

In [None]:
cleanjets = events.Jet[
    ak.all(events.Jet.metric_table(events.Muon[events.Muon.pt > 10]) >= 0.4, axis=2)
    & ak.all(
        events.Jet.metric_table(events.Electron[events.Electron.pt > 10]) >= 0.4,
        axis=2,
    )
    & (events.Jet.pt > 30)
]
q7_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="sumjetpt", label="Jet $\sum p_{T}$ [GeV]")
    .Double()
    .fill(ak.sum(cleanjets.pt, axis=1))
)

q7_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q7_hist)

# Query 8
For events with at least three light leptons and a same-flavor opposite-charge light lepton pair, find such a pair that has the invariant mass closest to 91.2 GeV in each event and plot the transverse mass of the system consisting of the missing tranverse momentum and the highest-<i>p</i><sub>T</sub> light lepton not in this pair.

In [None]:
events["Electron", "pdgId"] = -11 * events.Electron.charge
events["Muon", "pdgId"] = -13 * events.Muon.charge
events["leptons"] = ak.concatenate(
    [events.Electron, events.Muon],
    axis=1,
)
events = events[ak.num(events.leptons) >= 3]
pair = ak.argcombinations(events.leptons, 2, fields=["l1", "l2"])
pair = pair[(events.leptons[pair.l1].pdgId == -events.leptons[pair.l2].pdgId)]

pair = pair[
    ak.singletons(
        ak.argmin(
            abs((events.leptons[pair.l1] + events.leptons[pair.l2]).mass - 91.2),
            axis=1,
        )
    )
]

events = events[ak.num(pair) > 0]
pair = pair[ak.num(pair) > 0][:, 0]

l3 = ak.local_index(events.leptons)
l3 = l3[(l3 != pair.l1) & (l3 != pair.l2)]
l3 = l3[ak.argmax(events.leptons[l3].pt, axis=1, keepdims=True)]
l3 = events.leptons[l3][:, 0]

mt = np.sqrt(2 * l3.pt * events.MET.pt * (1 - np.cos(events.MET.delta_phi(l3))))
q8_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="mt", label="$\ell$-MET transverse mass [GeV]")
    .Double()
    .fill(mt)
)

q8_hist.compute().plot1d(flow="none", yerr=False)
dak.necessary_columns(q8_hist)

# Advanced fancy analysis
Fancy 4-muon analysis, searching for diboson events.

In [None]:
fileset = {
    "ZZto4mu": {
        "files": {
            "data/ZZTo4mu.root": "Events",
        }
    },
    "SMHiggsToZZTo4L": {
        "files": {
            "data/SMHiggsToZZTo4L.root": "Events",
        }
    },
}

In [None]:
@numba.njit
def find_4lep_kernel(events_leptons, builder):
    """Search for valid 4-lepton combinations from an array of events * leptons {charge, ...}

    A valid candidate has two pairs of leptons that each have balanced charge
    Outputs an array of events * candidates {indices 0..3} corresponding to all valid
    permutations of all valid combinations of unique leptons in each event
    (omitting permutations of the pairs)
    """
    for leptons in events_leptons:
        builder.begin_list()
        nlep = len(leptons)
        for i0 in range(nlep):
            for i1 in range(i0 + 1, nlep):
                if leptons[i0].charge + leptons[i1].charge != 0:
                    continue
                for i2 in range(nlep):
                    for i3 in range(i2 + 1, nlep):
                        if len({i0, i1, i2, i3}) < 4:
                            continue
                        if leptons[i2].charge + leptons[i3].charge != 0:
                            continue
                        builder.begin_tuple(4)
                        builder.index(0).integer(i0)
                        builder.index(1).integer(i1)
                        builder.index(2).integer(i2)
                        builder.index(3).integer(i3)
                        builder.end_tuple()
        builder.end_list()

    return builder


def find_4lep(events_leptons):
    if ak.backend(events_leptons) == "typetracer":
        # here we fake the output of find_4lep_kernel since
        # operating on length-zero data returns the wrong layout!
        ak.typetracer.touch_data(
            events_leptons.charge
        )  # force touching of the necessary data
        # Use x = ak.typetracer.length_zero_if_typetracer(events_leptons.charge) or x = ak.typetracer.length_one_if_typetracer(events_leptons.charge)
        # if you want a a length-zero/one NumPy-backed array to be returned for your computations
        return ak.Array(
            ak.Array([[(0, 0, 0, 0)]]).layout.to_typetracer(forget_length=True)
        )
    return find_4lep_kernel(events_leptons, ak.ArrayBuilder()).snapshot()


class FancyDimuonProcessor(processor.ProcessorABC):
    # The processor can also have an __init__ method

    def process(self, events):
        dataset_axis = hist.axis.StrCategory(
            [], growth=True, name="dataset", label="Primary dataset"
        )
        mass_axis = hist.axis.Regular(
            300, 0, 300, name="mass", label=r"$m_{\mu\mu}$ [GeV]"
        )
        pt_axis = hist.axis.Regular(300, 0, 300, name="pt", label=r"$p_{T,\mu}$ [GeV]")

        h_nMuons = hda.Hist(
            dataset_axis,
            hda.hist.hist.axis.IntCategory(
                range(6), name="nMuons", label="Number of good muons"
            ),
            storage="weight",
            label="Counts",
        )
        h_m4mu = hda.hist.Hist(
            dataset_axis, mass_axis, storage="weight", label="Counts"
        )
        h_mZ1 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_mZ2 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_ptZ1mu1 = hda.hist.Hist(
            dataset_axis, pt_axis, storage="weight", label="Counts"
        )
        h_ptZ1mu2 = hda.hist.Hist(
            dataset_axis, pt_axis, storage="weight", label="Counts"
        )

        cutflow = dict()

        dataset = events.metadata["dataset"]
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
                "isolation": events.Muon_pfRelIso03_all,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        # make sure they are sorted by transverse momentum
        muons = muons[ak.argsort(muons.pt, axis=1)]

        cutflow["all events"] = ak.num(muons, axis=0)

        # impose some quality and minimum pt cuts on the muons
        muons = muons[(muons.pt > 5) & (muons.isolation < 0.2)]
        cutflow["at least 4 good muons"] = ak.sum(ak.num(muons) >= 4)
        h_nMuons.fill(dataset=dataset, nMuons=ak.num(muons))

        # reduce first axis: skip events without enough muons
        muons = muons[ak.num(muons) >= 4]

        # find all candidates with helper function
        fourmuon = dak.map_partitions(find_4lep, muons)
        fourmuon = [muons[fourmuon[idx]] for idx in "0123"]

        fourmuon = ak.zip(
            {
                "z1": ak.zip(
                    {
                        "lep1": fourmuon[0],
                        "lep2": fourmuon[1],
                        "p4": fourmuon[0] + fourmuon[1],
                    }
                ),
                "z2": ak.zip(
                    {
                        "lep1": fourmuon[2],
                        "lep2": fourmuon[3],
                        "p4": fourmuon[2] + fourmuon[3],
                    }
                ),
            }
        )

        cutflow["at least one candidate"] = ak.sum(ak.num(fourmuon) > 0)

        # require minimum dimuon mass
        fourmuon = fourmuon[(fourmuon.z1.p4.mass > 60.0) & (fourmuon.z2.p4.mass > 20.0)]
        cutflow["minimum dimuon mass"] = ak.sum(ak.num(fourmuon) > 0)

        # choose permutation with z1 mass closest to nominal Z boson mass
        bestz1 = ak.singletons(ak.argmin(abs(fourmuon.z1.p4.mass - 91.1876), axis=1))
        fourmuon = ak.flatten(fourmuon[bestz1])

        h_m4mu.fill(
            dataset=dataset,
            mass=(fourmuon.z1.p4 + fourmuon.z2.p4).mass,
        )
        h_mZ1.fill(
            dataset=dataset,
            mass=fourmuon.z1.p4.mass,
        )
        h_mZ2.fill(
            dataset=dataset,
            mass=fourmuon.z2.p4.mass,
        )
        h_ptZ1mu1.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep1.pt,
        )
        h_ptZ1mu2.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep2.pt,
        )
        return {
            "nMuons": h_nMuons,
            "mass": h_m4mu,
            "mass_z1": h_mZ1,
            "mass_z2": h_mZ2,
            "pt_z1_mu1": h_ptZ1mu1,
            "pt_z1_mu2": h_ptZ1mu2,
            "cutflow": {dataset: cutflow},
        }

    def postprocess(self, accumulator):
        pass

In [None]:
from dask.distributed import Client

client = Client(n_workers=4)
client

In [None]:
to_compute = apply_to_fileset(
    FancyDimuonProcessor(),
    fileset,
    schemaclass=BaseSchema,
)

In [None]:
dak.necessary_columns(to_compute)

In [None]:
dask.visualize(to_compute, filename="unoptimized.pdf", optimize_graph=False)

In [None]:
dask.visualize(to_compute, filename="optimized.pdf", optimize_graph=True)

In [None]:
tstart = time.time()

(out,) = dask.compute(to_compute)
print(out)

elapsed = time.time() - tstart
print(elapsed)

In [None]:
nevt = (
    out["ZZto4mu"]["cutflow"]["ZZto4mu"]["all events"]
    + out["SMHiggsToZZTo4L"]["cutflow"]["SMHiggsToZZTo4L"]["all events"]
)
print("Events/s:", (nevt / elapsed))

scaled = {}
for (name1, h1), (name2, h2) in zip(
    out["ZZto4mu"].items(), out["SMHiggsToZZTo4L"].items()
):
    if isinstance(h1, hist.Hist) and isinstance(h2, hist.Hist):
        scaled[name1] = h1.copy() + h2.copy()

In [None]:
fig, ax = plt.subplots()
scaled["nMuons"].plot1d(ax=ax, overlay="dataset")
ax.set_yscale("log")
ax.set_ylim(1, None)
plt.show()

In [None]:
fig, ax = plt.subplots()

scaled["mass"][:, :: hist.rebin(4)].plot1d(ax=ax, overlay="dataset")
plt.show()

In [None]:
fig, ax = plt.subplots()

scaled["mass_z1"].plot1d(ax=ax, overlay="dataset")
plt.show()

In [None]:
fig, ax = plt.subplots()

scaled["mass_z2"].plot1d(ax=ax, overlay="dataset")
ax.set_xlim(2, 300)
plt.show()

In [None]:
fig, ax = plt.subplots()

scaled["pt_z1_mu1"].plot1d(ax=ax, overlay="dataset")
plt.show()

In [None]:
fig, ax = plt.subplots()

scaled["pt_z1_mu2"].plot1d(ax=ax, overlay="dataset");

It's not very hard to convert an analysis from `awkward1` + `coffea` to `awkward2` + `dask` + `coffea`:
- Processors are not needed anymore but they are a nice organization tool.
- Coffea executors have been removed and dask takes care of that.
- Accumulators are also not a thing anymore.
- Awkward operations on dask-awkward arrays are automatically dispatched `ak.some_function` -> `dak.some_function`.
- Use `import dask_awkward as dak` if you wanna be more explicit and not run operations on eager arrays by accident.
- For histogramming, use `import hist.dask as hda`. `hist.dask` histograms behave like empty `hist` histograms, and will return a filled `hist.hist.Hist` when `.compute()` is called.
- You'll still need to `import hist` for convenient definition of axes.
- If you want to get a complete array or histogram object in memory on your local machine so that you can manipulate it use `array.compute()`, `ahistogram.compute()`, or `dask.compute({"some": array, "another": histogram})`
- This should often be done at the end of code that is constructing arrays, do not prematurely `.compute()` as it can drastically slow down your analysis code.
- Use dask clusters and clients to scale your analysis.
- Most array operations that are available in `awkward` are available in `dask_awkward`, and if you encounter a problem or missing piece of functionality that you need you should open an issue at the [dask-awkward github page](https://github.com/dask-contrib/dask-awkward).
- Functions that cannot be written in pure `awkward` must be wrapped and used in `dak.map_partitions`. 