# Coffea Processors
Coffea relies mainly on [uproot](https://github.com/scikit-hep/uproot) to provide access to ROOT files for analysis.
As a usual analysis will involve processing tens to thousands of files, totalling gigabytes to terabytes of data, there is a certain amount of work to be done to build a parallelized framework to process the data in a reasonable amount of time.

In coffea up to 0.7 (SemVer), a `coffea.processor` module was provided to encapsulate the core functionality of the analysis, which could be run locally or distributed via a number of Executors. This allowed users to worry just about the actual analysis code and not about how to implement efficient parallelization, assuming that the parallization is a trivial map-reduce operation (e.g. filling histograms and adding them together).

In coffa 2024 (CalVer), integration with `dask` is deeper (via `dask_awkward` and `uproot.dask`), and whether an analysis is to be executed on local or distributed resources, a TaskGraph encapsulating the analysis is created (with a bypass functionality for an eager version still possible when the scale of your input data is sufficiently small). We will demonstrate how to use callable code to build these TGs.

(Sidenote: with some adaptations for the new version of scikit-hep/coffea, a SemVer coffea processor module's `process` function can serve as the callable function - we will follow this model for convenience as we are transitioning from SemVer to CalVer coffea)


Let's start by writing a simple processor class that reads some CMS open data and plots a dimuon mass spectrum.
We'll start by copying the [ProcessorABC](https://coffeateam.github.io/coffea/api/coffea.processor.ProcessorABC.html#coffea.processor.ProcessorABC) skeleton and filling in some details:

 * Remove `flag`, as we won't use it
 * Adding a new histogram for $m_{\mu \mu}$
 * Building a [Candidate](https://coffeateam.github.io/coffea/api/coffea.nanoevents.methods.candidate.PtEtaPhiMCandidate.html#coffea.nanoevents.methods.candidate.PtEtaPhiMCandidate) record for muons, since we will read it with `BaseSchema` interpretation (the files used here could be read with `NanoAODSchema` but we want to show how to build vector objects from other TTree formats)
 * Calculating the dimuon invariant mass

In [None]:
import gzip
import json
from pathlib import Path

import awkward as ak
from coffea import processor
from coffea.nanoevents.methods import candidate
from hist import Hist

In [None]:
class MyProcessor(processor.ProcessorABC):
    def __init__(self):
        pass

    def process(self, events):
        dataset = events.metadata["dataset"]
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        h_mass = (
            Hist.new.StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.2, 200.0, name="mass", label=r"$m_{\mu\mu}$ [GeV]")
            .Int64()
        )

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            dataset: {
                "entries": len(events),
                "mass": h_mass,
            }
        }

    def postprocess(self, accumulator):
        pass

If we were to just use bare uproot to execute this processor, we could do that with the following example, which:

 * Opens a CMS open data file
 * Creates a NanoEvents object using `BaseSchema` (roughly equivalent to the output of `uproot.open(...).arrays()`)
 * Creates a `MyProcessor` instance
 * Runs the `process()` function, which returns our accumulators


In [None]:
mumu_data_filename = "root://xcache.af.uchicago.edu:1094//root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"
mumu_data_filename = "/Users/iason/fun/2021-07-06-pyhep-uproot-awkward-tutorial/data/Run2012B_DoubleMuParked.root"

In [None]:
from coffea.nanoevents import NanoEventsFactory, BaseSchema

events = NanoEventsFactory.from_root(
    {mumu_data_filename: "Events"},
    entry_stop=10000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
).events()
p = MyProcessor()

In [None]:
%%time

out = p.process(events)

In [None]:
plot_dir = Path().cwd() / "plots"
plot_dir.mkdir(exist_ok=True)

In [None]:
import matplotlib.pyplot as plt
import mplhep

mplhep.style.use(mplhep.style.ATLAS)

fig, ax = plt.subplots()
out["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")

fig.savefig(plot_dir / "dimuon_charge.png")

# Filesets
We'll need to construct a fileset to run over

In [None]:
mumu_simulation_filename = "root://xcache.af.uchicago.edu:1094//root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/ZZTo4mu.root"
mumu_simulation_filename = "/Users/iason/fun/2021-07-06-pyhep-uproot-awkward-tutorial/data/HiggsZZ4mu.root"

In [None]:
initial_fileset = {
    "DoubleMuon": {
        "files": {
            mumu_data_filename: "Events",
        },
        "metadata": {
            "is_mc": False,
        },
    },
    "ZZ to 4mu": {
        "files": {
            mumu_simulation_filename: "Events",
        },
        "metadata": {
            "is_mc": True,
        },
    },
}

# Processing

In [None]:
%%time

iterative_run = processor.Runner(
    executor = processor.IterativeExecutor(compression=None),
    schema=BaseSchema,
    maxchunks=3,
    savemetrics=True,
)

out, metrics = iterative_run(
    initial_fileset,
    processor_instance=MyProcessor(),
)

In [None]:
out, metrics

In [None]:
plot_dir = Path().cwd() / "plots"
plot_dir.mkdir(exist_ok=True)

In [None]:
mplhep.style.use(mplhep.style.ATLAS)

fig, ax = plt.subplots()
out["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")

fig.savefig(plot_dir / "dimuon_charge.png")

Now, if we want to use more than a single core on our machine, we simply change `IterativeExecutor` for `FuturesExecutor`, which uses the python `concurrent.futures` standard library. We can then set the most interesting argument to the `FuturesExecutor`: the number of cores to use

In [None]:
%%time

futures_run = processor.Runner(
    executor = processor.FuturesExecutor(workers=4, compression=None),
    schema=BaseSchema,
    savemetrics=True,
)

out, metrics = futures_run(
    initial_fileset,
    processor_instance=MyProcessor()
)

In [None]:
out, metrics

In [None]:
plot_dir = Path().cwd() / "plots"
plot_dir.mkdir(exist_ok=True)

In [None]:
mplhep.style.use(mplhep.style.ATLAS)

fig, ax = plt.subplots()
out["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")

fig.savefig(plot_dir / "dimuon_charge.png")