In [None]:
# this portion is done to ignore warnings from coffea for now
from __future__ import annotations

import json
import time
from pathlib import Path

import awkward as ak
import dask
import dask_awkward as dak
import parse
from atlas_schema.methods import behavior as as_behavior
from atlas_schema.schema import NtupleSchema
from coffea import processor
from coffea.analysis_tools import PackedSelection
from coffea.dataset_tools import apply_to_fileset
from dask.diagnostics import ProgressBar
from dask.distributed import Client
from dask_jobqueue.htcondor import HTCondorCluster
from dask.distributed import LocalCluster
from matplotlib import pyplot as plt
import hist.dask as had

fname_pattern = parse.compile(
    "user.{username:w}.{dsid:d}.{process:S}.{campaign:w}.v{version:.1f}_ANALYSIS.root"
)

colors_dict = {
    "Znunu": "b",
    "Wenu": "g",
    "Wmunu": "r",
    "Wtaunu_L": "c",
    "Wtaunu_H": "m",
    "Znunugamma": "y",
    "Wmunugamma": "k",
    "Wenugamma": "brown",
    "Wtaunugamma": "pink",
    "N2_100_N1_97_WB_signal": "rosybrown",
    "Fake/Nonprompt": "lime",
}  #  'slategrey', 'blueviolet', 'crimson'


In [None]:
class MyProcessor(processor.ProcessorABC):
    def __init__(self):
        
        h_ph_pt = (
            had.Hist.new.StrCat(["A_true","B_true","C_true","D_true",
                                 "A_fake","B_fake","C_fake","D_fake"], name="ABCD")
            .Regular(100, 0.0, 100.0, name="pt", label="$pt_{\\gamma}$ [GeV]")
            .Int64()
        )
        
        pass

    def process(self, events):
        ## TODO: remove this temporary fix when https://github.com/scikit-hep/vector/issues/498 is resolved
        met_dict = {field: events.met[field] for field in events.met.fields}
        met_dict["pt"] = dak.zeros_like(events.met.met)
        met_dict["eta"] = dak.zeros_like(events.met.met)
        events["met"] = dak.zip(met_dict, with_name="MissingET", behavior=as_behavior)

        dataset = events.metadata["dataset"]
        
        print(f"processing {len(events)} events for {dataset}")
        # xs = events.metadata["xs"]
        # lum = events.metadata["luminosity"]
        # process = events.metadata["process"]
        # genFiltEff = events.metadata["genFiltEff"]
        # evt_count = ak.num(events, axis=0).compute()
        # weights = (xs * genFiltEff * lum / evt_count) * np.ones(evt_count)

        leptons = ak.concatenate((events.el, events.mu), axis=1)

        # here are some selection cuts for something that looks like the signal region.
        # the only thing that's different is the MET requirement, which I inverted to be
        # met<250 instead of met>250, to make sure we don't accidentally unblind the SR
        # and to give us some more stats while we study MC samples.
        selections = {
            "met": events.met.met < 250 * 1.0e3,
            "lepton_veto": ak.num(leptons, axis=1) == 0,
            "leading_jet_pt": ak.firsts(events.jet.pt) > 100 * 1.0e3,
            "min_dphi_jet_met": ak.min(events.met.delta_phi(events.jet), axis=1) > 0.4,
            "bjet_veto": ak.sum(events.jet.btag_select, axis=1) == 0,
            "vgamma_overlap": events["in"][
                "vgamma_overlap_7"
            ],
        }
        
        selection = PackedSelection()
        selection.add_multiple(selections)

        SR=(selection.all())

        # photon object preselection
        photon_preselection = ak.first(
            (events.ph.pt>10000) &
            (events.ph.select_baseline==1) &
            ((events.ph.isEM&0x45fc01)==0) &
            (
             (abs(events.ph.eta)<1.37) | 
             ((abs(events.ph.eta)>1.52) & 
              (abs(events.ph.eta)<2.37))
            ) &
            (events.ph.select_or_dR02Ph==1)
        )

        ph_tight = (events.ph.select_tightID==1)
        ph_iso   = (events.ph.select_tightIso==1)
        ph_truth = ((events.ph.truthType != 0) & (events.ph.truthType != 16))

        h_ph_pt.fill(ABCD="A_true", pt=ak.firsts(events[SR & photon_preselection &  ph_tight & ~ph_iso &  ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="B_true", pt=ak.firsts(events[SR & photon_preselection & ~ph_tight & ~ph_iso &  ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="C_true", pt=ak.firsts(events[SR & photon_preselection &  ph_tight &  ph_iso &  ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="D_true", pt=ak.firsts(events[SR & photon_preselection & ~ph_tight &  ph_iso &  ph_truth].ph.pt) / 1.0e3)

        h_ph_pt.fill(ABCD="A_fake", pt=ak.firsts(events[SR & photon_preselection &  ph_tight & ~ph_iso & ~ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="B_fake", pt=ak.firsts(events[SR & photon_preselection & ~ph_tight & ~ph_iso & ~ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="C_fake", pt=ak.firsts(events[SR & photon_preselection &  ph_tight &  ph_iso & ~ph_truth].ph.pt) / 1.0e3)
        h_ph_pt.fill(ABCD="D_fake", pt=ak.firsts(events[SR & photon_preselection & ~ph_tight &  ph_iso & ~ph_truth].ph.pt) / 1.0e3)

        return {
            dataset: {
                "entries": ak.num(events, axis=0),
                "ph_pt": h_ph_pt,
            }
        }

    def postprocess(self, accumulator):
        pass

In [None]:
start_time = time.time()

my_processor = MyProcessor()

# load in a bunch of datasets
#dataset_runnable = json.loads(Path("af_v2.json").read_text())
dataset_runnable = json.loads(Path("af_v2_onefile.json").read_text())

cluster=None
dataset_to_run=None

can_submit_to_condor=False
datasettag='Znunugamma'

if can_submit_to_condor:
    # To facilitate usage with HTCondor
    cluster = HTCondorCluster(
        log_directory=Path().cwd() / ".condor_logs" / "cutflows_v2",
        cores=4,
        memory="4GB",
        disk="2GB",
    )
    cluster.scale(jobs=100)

    # if we're running over all samples, ensure that here
    dataset_to_run=dataset_runnable
else:
    cluster=LocalCluster()
    dataset_to_run={datasettag: dataset_runnable[datasettag]}


client = Client(cluster)

print("Applying to fileset")
out = apply_to_fileset(
    my_processor,
    dataset_to_run,
    schemaclass=NtupleSchema,
)

print("Beginning of dask.compute()")

# Add progress bar for dask
pbar = ProgressBar()
pbar.register()

(computed,) = dask.compute(out)
end_time = time.time()

print("Execution time: ", end_time - start_time)
print("Finished dask.compute")
print(computed)

In [None]:
# Plot histograms
fig, ax = plt.subplots()
computed[datasettag][datasettag]["ph_pt"].plot1d(ax=ax)
ax.set_yscale("log")
ax.legend(title="Photon pT for $\\gamma + W\\rightarrow e \\nu$")