In [1]:
import awkward as ak
import dask
from distributed import performance_report
import hist
from hist.dask import Hist
import json
from coffea import processor
from coffea.nanoevents import BaseSchema, NanoAODSchema 
from coffea.dataset_tools import apply_to_dataset, apply_to_fileset, preprocess, rucio_utils
from coffea.dataset_tools import max_chunks, max_files, slice_chunks, slice_files
from coffea.analysis_tools import Weights, PackedSelection
import corrections
import matplotlib.pyplot as plt

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [2]:
class MyZPeak(processor.ProcessorABC):
    def process(self, events):
        dataset = events.metadata['dataset']
        isRealData = "genWeight" not in events.fields
        #sumw = 0. if isRealData else ak.sum(events.genWeight, axis=0)
        weights = Weights(None)
        ps = PackedSelection()
        
        if isRealData:
            weights.add("nominal", ak.ones_like(events.event))
            ps.add("lumimask", corrections.lumimask(events.run, events.luminosityBlock))
        else:
            weights.add("genWeight", events.genWeight)
            # for symmetry, we can use a trivially-true array for "lumimask" in MC
            ps.add("lumimask", ak.ones_like(events.event, dtype=bool))
    
        events["goodmuons"] = events.Muon[
            (events.Muon.pt >= 20.)
            & events.Muon.tightId
        ]
        ps.add("ossf", (ak.num(events.goodmuons) == 2) & (ak.sum(events.goodmuons.charge, axis=1) == 0))
        
        # https://twiki.cern.ch/twiki/bin/view/CMS/MuonHLT2018
        ps.add("trigger", events.HLT.Mu17_TrkIsoVVL_Mu8_TrkIsoVVL_DZ_Mass3p8)
        
        # add first and second muon p4 in every event together
        sel_events = events[ps.require(lumimask=True, ossf=True, trigger=True)]
        sel_zcand = sel_events.goodmuons[:, 0] + sel_events.goodmuons[:, 1]
        sel_weights = weights.weight()[ps.all("lumimask", "ossf", "trigger")] 

        cutflow = {"lumimask": ak.sum(ps.require(lumimask=True), axis=0),
                   "ossf": ak.sum(ps.require(lumimask=True, ossf=True), axis=0),
                   "trigger": ak.sum(ps.require(lumimask=True, ossf=True, trigger=True), axis=0),
                  } 

        return {
                "sumw": weights.weightStatistics,
                "cutflow": cutflow,
                "mass": (
                    Hist.new
                    .Reg(120, 0., 120., label="$m_{\mu\mu}$ [GeV]")
                    .Weight()
                    .fill(sel_zcand.mass, weight=sel_weights)
                )
            }

    def postprocess(self, accumulator):
        return accumulator

In [3]:
from dask.distributed import Client

client = Client("tls://localhost:8786")
client

0,1
Connection method: Direct,
Dashboard: /user/ruvarashe.nyaban@bulldogs.aamu.edu/proxy/8787/status,

0,1
Comm: tls://192.168.121.180:8786,Workers: 0
Dashboard: /user/ruvarashe.nyaban@bulldogs.aamu.edu/proxy/8787/status,Total threads: 0
Started: 1 minute ago,Total memory: 0 B


In [None]:
import shutil
shutil.make_archive("corrections", "zip", base_dir="corrections")

In [None]:
client.upload_file("corrections.zip")

In [None]:
with open("fileset.json", "rt") as file:
    initial_fileset = json.load(file)

In [None]:
preprocessed_available, preprocessed_total = preprocess(
        initial_fileset,
        step_size=50_000,
        align_clusters=None,
        skip_bad_files=True,
        recalculate_steps=False,
        files_per_batch=1,
        file_exceptions=(OSError,),
        save_form=True,
        uproot_options={},
        step_size_safety_factor=0.5,
    )

In [None]:
import gzip, pickle, json
output_file = "scaleout_fileset"
with gzip.open(f"{output_file}_available.json.gz", "wt") as file:
    json.dump(preprocessed_available, file, indent=2)
    print(f"Saved available fileset chunks to {output_file}_available.json.gz")
with gzip.open(f"{output_file}_all.json.gz", "wt") as file:
    json.dump(preprocessed_total, file, indent=2)
    print(f"Saved complete fileset chunks to {output_file}_all.json.gz")

In [None]:
test_preprocessed_files = max_files(preprocessed_available, 30)
test_preprocessed = max_chunks(test_preprocessed_files, 30)

In [None]:
#small_tg, small_rep = apply_to_fileset(data_manipulation=MyZPeak(),
                            #fileset=test_preprocessed,
tg, rep = apply_to_fileset(data_manipulation=MyZPeak(),    
                            fileset=test_preprocessed,
                            schemaclass=NanoAODSchema,
                            uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
                           )

## Distributed performance_report
The dask distributed performance_report will save an html page of our dask computations,
preserving a record of the tasks, performance profiling, and so on

In [None]:
performance_report_name = "scaleout_report.html"
with performance_report(performance_report_name, stacklevel=1, mode=None, storage_options=None):
    result, report = dask.compute(tg, rep)

### Computing the luminosity scaling
The coffea lumi_tools subpackage contains tools for calculating the lumi processed
We'll take a simpler approach, calculating the fraction of the dataset we computed
and scaling the known-luminosity of the 2018A run by this fraction

In [None]:
# Compute h
def total_data(events):
    isRealData = "genWeight" not in events.fields
    if isRealData:
        events = events[
            corrections.lumimask(events.run, events.luminosityBlock)
        ]
        return ak.num(events, axis=0)
    else:
        return -1

In [None]:
# This number should be calculated on the preprocessed_total,
# but some of those files may be inaccessible. We rely upon most being readable
dfd, _ = apply_to_fileset(data_manipulation=total_data,
                                     fileset=preprocessed_available,
                                     schemaclass=NanoAODSchema,
                                     uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
                                    )
data_fraction_den = dfd["DoubleMuon2018A"].compute()

In [None]:

data_fraction_num = result["DoubleMuon2018A"]["cutflow"]["lumimask"]
data_fraction = data_fraction_num / data_fraction_den
print(data_fraction)

In [None]:
data = result["DoubleMuon2018A"]["mass"]

lumi = 14.0
xsweight = lumi * 1e3 * 6225.42 * data_fraction / result["ZJets2018"]["sumw"]["genWeight"]["sumw"]
sim = result["ZJets2018"]["mass"] * xsweight

In [None]:
fig, ax = plt.subplots()
sim.plot(ax=ax, histtype="fill", label="Z+jets")
data.plot(ax=ax, histtype="errorbar", color="k", label="Data")
ax.set_xlim(60, 120)
ax.legend()