# Coffea Processors
Coffea relies mainly on [uproot](https://github.com/scikit-hep/uproot) to provide access to ROOT files for analysis.
As a usual analysis will involve processing tens to thousands of files, totalling gigabytes to terabytes of data, there is a certain amount of work to be done to build a parallelized framework to process the data in a reasonable amount of time. 

In coffea up to 0.7 (SemVer), a `coffea.processor` module was provided to encapsulate the core functionality of the analysis, which could be run locally or distributed via a number of Execturors. This allowed users to worry just about the actual analysis code and not about how to implement efficient parallelization, assuming that the parallization is a trivial map-reduce operation (e.g. filling histograms and adding them together).

In coffa 2024 (CalVer), integration with `dask` is deeper (via `dask_awkward` and `uproot.dask`), and whether an analysis is to be executed on local or distributed resources, a TaskGraph encapsulating the analysis is created. We will demonstrate how to use callable code to build these TGs 

(Sidenote: with some adaptations for the new version of scikit-hep/coffea, a SemVer coffea processor module's `process` function can serve as the callable function)


Let's start by writing a simple processor class that reads some CMS open data and plots a dimuon mass spectrum.
We'll start by copying the [ProcessorABC](https://coffeateam.github.io/coffea/api/coffea.processor.ProcessorABC.html#coffea.processor.ProcessorABC) skeleton and filling in some details:

 * Remove `flag`, as we won't use it
 * Adding a new histogram for $m_{\mu \mu}$
 * Building a [Candidate](https://coffeateam.github.io/coffea/api/coffea.nanoevents.methods.candidate.PtEtaPhiMCandidate.html#coffea.nanoevents.methods.candidate.PtEtaPhiMCandidate) record for muons, since we will read it with `BaseSchema` interpretation (the files used here could be read with `NanoAODSchema` but we want to show how to build vector objects from other TTree formats) 
 * Calculating the dimuon invariant mass

In [None]:
import awkward as ak
from coffea import processor
from coffea.nanoevents.methods import candidate
import hist
import dask
from hist.dask import Hist

class MyProcessor(processor.ProcessorABC):
    def __init__(self):
        pass

    def process(self, events):
        dataset = events.metadata['dataset']
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        h_mass = (
            Hist.new
            .StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.2, 200., name="mass", label="$m_{\mu\mu}$ [GeV]")
            .Int64()
        )

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            "entries": ak.num(events, axis=0),
            "mass": h_mass,
        }
    
    def postprocess(self, accumulator):
        pass

If we were to just use bare uproot to execute this processor, we could do that with the following example, which:

 * Opens a CMS open data file
 * Creates a NanoEvents object using `BaseSchema` (roughly equivalent to the output of `uproot.lazy`)
 * Creates a `MyProcessor` instance
 * Runs the `process()` function, which returns our accumulators


In [None]:
import uproot
from coffea.nanoevents import NanoEventsFactory, BaseSchema

filename = "root://xcache//store/user/ncsmith/opendata_mirror/Run2012B_DoubleMuParked.root"
#file = uproot.open(filename)
events = NanoEventsFactory.from_root(
    {filename: "Events"},
    entry_stop=10000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
    delayed=True,
).events()
p = MyProcessor()
taskgraph = p.process(events)
taskgraph

In [None]:
dask.visualize(taskgraph['mass'], optimize_graph=False)

In [None]:
dask.visualize(taskgraph['mass'], optimize_graph=True)

In [None]:
dask.compute(taskgraph)

In [None]:
out, *_ = dask.compute(taskgraph)

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
out["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")

One could expand on this code to run over several chunks of the file, setting `entry_start` and `entry_stop` as appropriate. Then, several datasets could be processed by iterating over several files. However, the processor [Runner](https://coffeateam.github.io/coffea/api/coffea.processor.Runner.html) can help with this! One lists the datasets and corresponding files, the processor they want to run, and which executor they want to use. Available executors derive from `ExecutorBase` and are listed [here](https://coffeateam.github.io/coffea/modules/coffea.processor.html#classes). Since these files are very large, we limit to just reading the first few chunks of events from each dataset with `maxchunks`.

In [None]:
initial_fileset = {
    "DoubleMuon": {
        "files": {
            "root://xcache//store/user/ncsmith/opendata_mirror/Run2012B_DoubleMuParked.root": "Events",
            "root://xcache//store/user/ncsmith/opendata_mirror/Run2012C_DoubleMuParked.root": "Events",
        },
        "metadata": {
            "is_mc": True,
        },
    },
    "ZZ to 4mu": {
        "files": {
            "root://xcache//store/user/ncsmith/opendata_mirror/ZZTo4mu.root": "Events",
        },
        "metadata": {
            "is_mc": False,
        }
    }
}

# Preprocessing
We'll see in a separate notebook how to construct such an initial fileset using dataset discovery tools. For now, we'll take the above `initial_fileset` and preprocess it.

In [None]:
from coffea.dataset_tools import apply_to_fileset, max_chunks, max_files, preprocess

In [None]:
preprocessed_available, preprocessed_total = preprocess(
        initial_fileset,
        step_size=100_000,
        align_clusters=None,
        skip_bad_files=True,
        recalculate_steps=False,
        files_per_batch=1,
        file_exceptions=(OSError,),
        save_form=True,
        uproot_options={},
        step_size_safety_factor=0.5,
    )
    #with gzip.open(f"{output_file}_available.json.gz", "wt") as file:
    #    print(f"Saved available fileset chunks to {output_file}_available.json.gz")

# Preprocessed fileset
Lets have a look at the contents of the preprocessed_available part of the fileset

In [None]:
preprocessed_available

In [None]:
iterative_run = processor.Runner(
    executor = processor.IterativeExecutor(compression=None),
    schema=BaseSchema,
    maxchunks=4,
)

out = iterative_run(
    fileset,
    treename="Events",
    processor_instance=MyProcessor(),
)
out

# Slicing chunks and files
Given this preprocessed fileset, we can test our processor on just a few chunks of a handful of files. To do this, we use the max_files and max_chunks functions from the dataset tools

In [None]:
test_preprocessed_files = max_files(preprocessed_available, 1)
test_preprocessed = max_chunks(test_preprocessed_files, 3)

In [None]:
test_preprocessed

In [None]:
small_tg, small_rep = apply_to_fileset(data_manipulation=MyProcessor(),
                            fileset=test_preprocessed,
                            schemaclass=BaseSchema,
                            uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
                           )

In [None]:
dask.visualize(small_tg, optimize_graph=True)

In [None]:
small_computed, small_rep_computed = dask.compute(small_tg, small_rep)

In [None]:
small_rep_computed['DoubleMuon']

In [None]:
small_computed

Now, if we want to use more than a single core on our machine, we simply change [IterativeExecutor](https://coffeateam.github.io/coffea/api/coffea.processor.IterativeExecutor.html) for [FuturesExecutor](https://coffeateam.github.io/coffea/api/coffea.processor.FuturesExecutor.html), which uses the python [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) standard library. We can then set the most interesting argument to the `FuturesExecutor`: the number of cores to use (2):

In [None]:
full_tg, rep = apply_to_fileset(data_manipulation=MyProcessor(),
                            fileset=preprocessed_available,
                            schemaclass=BaseSchema,
                            uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
                           )

In [None]:
out, rep = dask.compute(full_tg, rep)

In [None]:
out

Hopefully this ran faster than the previous cell, but that may depend on how many cores are available on the machine you are running this notebook and your connection to `eospublic.cern.ch`. At least the output will be prettier now:

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))
out["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")

## Getting fancy
Let's flesh out this analysis into a 4-muon analysis, searching for diboson events:

In [None]:
from collections import defaultdict
import numba


@numba.njit
def find_4lep(events_leptons, builder):
    """Search for valid 4-lepton combinations from an array of events * leptons {charge, ...}
    
    A valid candidate has two pairs of leptons that each have balanced charge
    Outputs an array of events * candidates {indices 0..3} corresponding to all valid
    permutations of all valid combinations of unique leptons in each event
    (omitting permutations of the pairs)
    """
    for leptons in events_leptons:
        builder.begin_list()
        nlep = len(leptons)
        for i0 in range(nlep):
            for i1 in range(i0 + 1, nlep):
                if leptons[i0].charge + leptons[i1].charge != 0:
                    continue
                for i2 in range(nlep):
                    for i3 in range(i2 + 1, nlep):
                        if len({i0, i1, i2, i3}) < 4:
                            continue
                        if leptons[i2].charge + leptons[i3].charge != 0:
                            continue
                        builder.begin_tuple(4)
                        builder.index(0).integer(i0)
                        builder.index(1).integer(i1)
                        builder.index(2).integer(i2)
                        builder.index(3).integer(i3)
                        builder.end_tuple()
        builder.end_list()

    return builder


class FancyDimuonProcessor(processor.ProcessorABC):
    def __init__(self):
        pass
    
    def process(self, events):
        dataset_axis = hist.axis.StrCategory([], growth=True, name="dataset", label="Primary dataset")
        mass_axis = hist.axis.Regular(300, 0, 300, name="mass", label=r"$m_{\mu\mu}$ [GeV]")
        pt_axis = hist.axis.Regular(300, 0, 300, name="pt", label=r"$p_{T,\mu}$ [GeV]")
        
        h_nMuons = Hist(
            dataset_axis,
            hist.axis.IntCategory(range(6), name="nMuons", label="Number of good muons"),
            storage="weight", label="Counts",
        )
        h_m4mu = Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_mZ1 = Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_mZ2 = Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_ptZ1mu1 = Hist(dataset_axis, pt_axis, storage="weight", label="Counts")
        h_ptZ1mu2 = Hist(dataset_axis, pt_axis, storage="weight", label="Counts")
                
        #cutflow = defaultdict(int)
        
        dataset = events.metadata['dataset']
        muons = ak.zip({
            "pt": events.Muon_pt,
            "eta": events.Muon_eta,
            "phi": events.Muon_phi,
            "mass": events.Muon_mass,
            "charge": events.Muon_charge,
            "softId": events.Muon_softId,
            "isolation": events.Muon_pfRelIso03_all,
        }, with_name="PtEtaPhiMCandidate", behavior=candidate.behavior)
        
        # make sure they are sorted by transverse momentum
        muons = muons[ak.argsort(muons.pt, axis=1)]
        
        #cutflow['all events'] += len(muons)
        
        # impose some quality and minimum pt cuts on the muons
        muons = muons[
            muons.softId
            & (muons.pt > 5)
            & (muons.isolation < 0.2)
        ]
        #cutflow['at least 4 good muons'] += ak.sum(ak.num(muons) >= 4)
        h_nMuons.fill(dataset=dataset, nMuons=ak.num(muons))
        
        # reduce first axis: skip events without enough muons
        muons = muons[ak.num(muons) >= 4]
        
        # find all candidates with helper function
        fourmuon = find_4lep(muons, ak.ArrayBuilder()).snapshot()
        if ak.all(ak.num(fourmuon) == 0):
            # skip processing as it is an EmptyArray
            return {
                'nMuons': h_nMuons,
                'cutflow': {dataset: cutflow},
            }
        fourmuon = [muons[fourmuon[idx]] for idx in "0123"]
        fourmuon = ak.zip({
            "z1": ak.zip({
                "lep1": fourmuon[0],
                "lep2": fourmuon[1],
                "p4": fourmuon[0] + fourmuon[1],
            }),
            "z2": ak.zip({
                "lep1": fourmuon[2],
                "lep2": fourmuon[3],
                "p4": fourmuon[2] + fourmuon[3],
            }),
        })
        
        #cutflow['at least one candidate'] += ak.sum(ak.num(fourmuon) > 0)
         
        # require minimum dimuon mass
        fourmuon = fourmuon[(fourmuon.z1.p4.mass > 60.) & (fourmuon.z2.p4.mass > 20.)]
        #cutflow['minimum dimuon mass'] += ak.sum(ak.num(fourmuon) > 0)
        
        # choose permutation with z1 mass closest to nominal Z boson mass
        bestz1 = ak.singletons(ak.argmin(abs(fourmuon.z1.p4.mass - 91.1876), axis=1))
        fourmuon = ak.flatten(fourmuon[bestz1])
        
        h_m4mu.fill(
            dataset=dataset,
            mass=(fourmuon.z1.p4 + fourmuon.z2.p4).mass,
        )
        h_mZ1.fill(
            dataset=dataset, 
            mass=fourmuon.z1.p4.mass,
        )
        h_mZ2.fill(
            dataset=dataset, 
            mass=fourmuon.z2.p4.mass,
        )
        h_ptZ1mu1.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep1.pt,
        )
        h_ptZ1mu2.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep2.pt,
        )
        return {
            'nMuons': h_nMuons,
            'mass': h_m4mu,
            'mass_z1': h_mZ1,
            'mass_z2': h_mZ2,
            'pt_z1_mu1': h_ptZ1mu1,
            'pt_z1_mu2': h_ptZ1mu2,
            #'cutflow': {dataset: cutflow},
        }

    def postprocess(self, accumulator):
        pass

In [None]:
import time

tstart = time.time()


fancy_tg, fancy_rep = apply_to_fileset(data_manipulation=FancyDimuonProcessor(),
                                       fileset=preprocessed_available,
                                       schemaclass=BaseSchema,
                                       uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
                                      )
output, realized_rep = dask.compute(fancy_tg, fancy_rep)

elapsed = time.time() - tstart
print(output)

In [None]:
nevt = output['cutflow']['ZZ to 4mu']['all events'] + output['cutflow']['DoubleMuon']['all events']
print("Events/s:", nevt / elapsed)

What follows is just us looking at the output, you can execute it if you wish

In [None]:
# scale ZZ simulation to expected yield
lumi = 11.6  # 1/fb
zzxs = 7200 * 0.0336**2  # approximate 8 TeV ZZ(4mu)
nzz = output['cutflow']['ZZ to 4mu']['all events']

scaled = {}
for name, h in output.items():
    if isinstance(h, hist.Hist):
        scaled[name] = h.copy()
        scaled[name].view()[0, :] *= lumi * zzxs / nzz

In [None]:
fig, ax = plt.subplots()
scaled['nMuons'].plot1d(ax=ax, overlay='dataset')
ax.set_yscale('log')
ax.set_ylim(1, None)

In [None]:
fig, ax = plt.subplots()

scaled['mass'][:, ::hist.rebin(4)].plot1d(ax=ax, overlay='dataset');

In [None]:
fig, ax = plt.subplots()

scaled['mass_z1'].plot1d(ax=ax, overlay='dataset');

In [None]:
fig, ax = plt.subplots()

scaled['mass_z2'].plot1d(ax=ax, overlay='dataset')
ax.set_xlim(2, 300)
# ax.set_xscale('log')

In [None]:
fig, ax = plt.subplots()

scaled['pt_z1_mu1'].plot1d(ax=ax, overlay='dataset');

In [None]:
fig, ax = plt.subplots()

scaled['pt_z1_mu2'].plot1d(ax=ax, overlay='dataset');