In [None]:
%matplotlib inline
import uproot
from coffea.nanoevents import NanoEventsFactory, BaseSchema
import hist
import dask
import awkward as ak
import hist.dask as hda
import dask_awkward as dak
from coffea.processor import ProcessorABC
from coffea.nanoevents.methods import candidate
from coffea.dataset_tools import (
    apply_to_fileset,
    max_chunks,
    preprocess,
)
from dask.distributed import Client
from dask_jobqueue import HTCondorCluster

class Tau2MuMuMuProcessor(ProcessorABC):
    def __init__(self):
        ...

    def process(self, events):
        dataset = events.metadata['dataset']
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        h_mass = (
            hda.Hist.new
            .StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.2, 200., name="mass", label="$m_{\mu\mu}$ [GeV]")
            .Int64()
        )

        cut = (
            (ak.num(muons) == 2) 
            & (ak.sum(muons.charge, axis=1) == 0)
        )
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)

        cut = (
            (ak.num(muons) == 2) 
            & (ak.sum(muons.charge, axis=1) != 0)
        )
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            dataset: {
                "entries": ak.num(events, axis=0),
                "mass": h_mass,
            }
        }

    def postprocess(self, accumulator):
        pass

filename = '/pnfs/knu.ac.kr/data/cms/store/user/seyang/Tau2MuMuMu/data/Run2017C/DoubleMuonLowMass/MINIAOD/09Aug2019_UL2017-v1/20000/A1F86ED8-BCA7-DF40-9B8A-AE0A0156DACF.root'

factory = NanoEventsFactory.from_root(
    {filename: "Events"},
    steps_per_file=2_000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
)
events = factory.events()
processor = Tau2MuMuMuProcessor()
out = processor.process(events)

cluster = HTCondorCluster(cores=2, memory="1GB", disk="1GB")
client = Client(cluster)

computed = client.compute(out)
print(computed)

computed

cluster.status

print(cluster.job_script())

cluster.status

cluster.job_name

computed.status