In [132]:
import awkward as ak
import numpy as np
import pandas as pd
import json
import os
import shutil
import pathlib
from typing import List, Optional

from coffea import processor
from coffea.nanoevents.methods import candidate, vector
from coffea.analysis_tools import Weights, PackedSelection

import warnings
warnings.filterwarnings("ignore", message="Found duplicate branch ")

import pyarrow as pa
import pyarrow.parquet as pq
import pickle as pkl

In [133]:
# # from coffea casa
# import uproot
# f = uproot.open('root://xcache//store/mc/RunIISummer19UL17NanoAODv2/GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8/NANOAODSIM/106X_mc2017_realistic_v8-v1/250000/13D6BBD5-89E3-8647-AED6-FB5DFAAF4C8C.root:Events')
# f.num_entries   ### checks number of events

In [134]:
def pad_val(
    arr: ak.Array,
    value: float,
    target: int = None,
    axis: int = 0,
    to_numpy: bool = False,
    clip: bool = True,
):
    """
    pads awkward array up to ``target`` index along axis ``axis`` with value ``value``,
    optionally converts to numpy array
    """
    if target:
        ret = ak.fill_none(ak.pad_none(arr, target, axis=axis, clip=clip), value, axis=None)
    else:
        ret = ak.fill_none(arr, value, axis=None)
    return ret.to_numpy() if to_numpy else ret


from collections import defaultdict
def dsum(*dicts):
    ret = defaultdict(int)
    for d in dicts:
        for k, v in d.items():
            ret[k] += v
    return dict(ret)  

In [135]:
def getParticles(genparticles,lowid=22,highid=25,flags=['fromHardProcess', 'isLastCopy']):
    """
    returns the particle objects that satisfy a low id, 
    high id condition and have certain flags
    """
    absid = abs(genparticles.pdgId)
    return genparticles[
        ((absid >= lowid) & (absid <= highid))
        & genparticles.hasFlags(flags)
    ]

def match_HWWlepqq(genparticles,candidatefj):
    """
    return the number of matched objects (hWW*),daughters, 
    and gen flavor (enuqq, munuqq, taunuqq) 
    """
    W_PDGID = 24
    HIGGS_PDGID = 25
    higgs = getParticles(genparticles,HIGGS_PDGID)
    is_hWW = ak.all(abs(higgs.children.pdgId)==W_PDGID,axis=2)

    higgs = higgs[is_hWW]
    higgs_wstar = higgs.children[ak.argmin(higgs.children.mass,axis=2,keepdims=True)]
    higgs_w = higgs.children[ak.argmax(higgs.children.mass,axis=2,keepdims=True)]
    
    prompt_electron = getParticles(genparticles,11,11,['isPrompt','isLastCopy'])
    prompt_muon = getParticles(genparticles,13,13,['isPrompt', 'isLastCopy'])
    prompt_tau = getParticles(genparticles,15,15,['isPrompt', 'isLastCopy'])
    prompt_q = getParticles(genparticles,0,5,['fromHardProcess', 'isLastCopy'])
    prompt_q = prompt_q[abs(prompt_q.distinctParent.pdgId) == W_PDGID]
    
    dr_fj_quarks = candidatefj.delta_r(prompt_q)
    dr_fj_electrons = candidatefj.delta_r(prompt_electron)
    dr_fj_muons = candidatefj.delta_r(prompt_muon)
    dr_fj_taus = candidatefj.delta_r(prompt_tau)
    dr_daughters = ak.concatenate([dr_fj_quarks,dr_fj_electrons,dr_fj_muons,dr_fj_taus],axis=1)
    hWWlepqq_nprongs = ak.sum(dr_daughters<0.8,axis=1)
    
    n_electrons = ak.sum(prompt_electron.pt>0,axis=1)
    n_muons = ak.sum(prompt_muon.pt>0,axis=1)
    n_taus = ak.sum(prompt_tau.pt>0,axis=1)
    n_quarks = ak.sum(prompt_q.pt>0,axis=1)

    # 4(elenuqq),6(munuqq),8(taunuqq)
    hWWlepqq_flavor = (n_quarks==2)*1 + (n_electrons==1)*3 + (n_muons==1)*5 + (n_taus==1)*7
    
    matchedH = candidatefj.nearest(higgs, axis=1, threshold=0.8)
    matchedW = candidatefj.nearest(higgs_w, axis=1, threshold=0.8)
    matchedWstar = candidatefj.nearest(higgs_wstar, axis=1, threshold=0.8) 

    # 1 (H only), 4(W), 6(W star), 9(H, W and Wstar)
    hWWlepqq_matched = (
        (ak.sum(matchedH.pt > 0, axis=1)==1) * 1 
        + (ak.sum(ak.flatten(matchedW.pt > 0, axis=2), axis=1)==1) * 3 
        + (ak.sum(ak.flatten(matchedWstar.pt > 0, axis=2), axis=1)==1) * 5
    )
    
    # leptons matched
    dr_leptons = ak.concatenate([dr_fj_electrons,dr_fj_muons], axis=1)
    matched_leptons = dr_leptons < 0.8
    
    leptons = ak.concatenate([prompt_electron, prompt_muon], axis=1)
    leptons = leptons[matched_leptons]
    
    # leptons coming from W or W*
    leptons_mass = ak.firsts(leptons.distinctParent.mass)
    higgs_w_mass = ak.firsts(ak.flatten(higgs_w.mass))[ak.firsts(leptons.pt > 0)]
    higgs_wstar_mass = ak.firsts(ak.flatten(higgs_wstar.mass))[ak.firsts(leptons.pt > 0)]

    iswlepton = (leptons_mass == higgs_w_mass)
    iswstarlepton = (leptons_mass == higgs_wstar_mass)
    
    # let's return only:
    # - matchedH (the higgs boson that is matched to the jet)
    # - (iswlepton,iswstarlepton)
    return matchedH, iswlepton, iswstarlepton, higgs

In [136]:
class HwwProcessor(processor.ProcessorABC):
    def __init__(self, year="2017", yearmod="", channels=["ele", "mu", "had"], output_location="./", folder_name=''):
        self._year = year
        self._yearmod = yearmod
        self._channels = channels
        self._output_location = output_location
        self.folder_name = folder_name

        # define variables to save for each channel
        self._skimvars = {
            'ele': [
                "lepton_pt",
                "lep_isolation",
                "met",
                "ht",
                "mt_lep_met",
                "dr_jet_candlep",
            ],
            'mu': [
                "lepton_pt",
                "lep_isolation",
                "met",
                "ht",
                "mt_lep_met",
                "dr_jet_candlep",
                "mu_mvaId"
            ],
            'had': [
                "leadingfj_pt",
                "leadingfj_msoftdrop",
                "secondfj_pt",
                "secondfj_msoftdrop",
                "met",
                "ht",
                "bjets_ophem_leadingfj"
            ],
        }

        # trigger paths
        self._HLTs = {
            2016: {
                'ele': [
                    "Ele27_WPTight_Gsf",
                    "Ele115_CaloIdVT_GsfTrkIdT",
                    "Photon175",
                    # "Ele50_CaloIdVT_GsfTrkIdT_PFJet165", # extra
                    # "Ele15_IsoVVVL_PFHT600", # VVL
                ],
                'mu': [
                    "Mu50",
                    "TkMu50",
                    "IsoMu24",
                    "IsoTkMu24",
                    # "Mu55",
                    # "Mu15_IsoVVVL_PFHT600" # VVL
                ],
                'had': [
                    "PFHT800",
                    "PFHT900",
                    "AK8PFJet360_TrimMass30",
                    "AK8PFHT700_TrimR0p1PT0p03Mass50",
                    "PFHT650_WideJetMJJ950DEtaJJ1p5",
                    "PFHT650_WideJetMJJ900DEtaJJ1p5",
                    "PFJet450",
                ],
            },
            2017: {
                'ele': [
                    "Ele35_WPTight_Gsf",
                    "Ele115_CaloIdVT_GsfTrkIdT",
                    "Photon200",
                    # "Ele50_CaloIdVT_GsfTrkIdT_PFJet165", # extra
                    # "Ele15_IsoVVVL_PFHT600", # VVL
                ],
                'mu': [
                    "Mu50",
                    "IsoMu27",
                    "OldMu100",
                    "TkMu100",
                    # "Mu15_IsoVVVL_PFHT600", # VVL
                ],
                'had': [
                    "PFHT1050",
                    "AK8PFJet400_TrimMass30",
                    "AK8PFJet420_TrimMass30",
                    "AK8PFHT800_TrimMass50",
                    "PFJet500",
                    "AK8PFJet500",
                ],
            },
            2018: {
                'ele': [
                    "Ele32_WPTight_Gsf",
                    "Ele115_CaloIdVT_GsfTrkIdT",
                    "Photon200",
                    # "Ele50_CaloIdVT_GsfTrkIdT_PFJet165", # extra
                    # "Ele15_IsoVVVL_PFHT600", # VVL
                ],
                'mu': [
                    "Mu50",
                    "IsoMu24",
                    "OldMu100",
                    "TkMu100",
                    # "Mu15_IsoVVVL_PFHT600", # VVL
                ],
                'had': [
                    "PFHT1050",
                    "AK8PFJet400_TrimMass30",
                    "AK8PFJet420_TrimMass30",
                    "AK8PFHT800_TrimMass50",
                    "PFJet500",
                    "AK8PFJet500",
                ],
            }
        }[int(self._year)]

        # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersRun2
        self._metfilters = {
            2016: [
                "goodVertices",
                "globalSuperTightHalo2016Filter",
                "HBHENoiseFilter",
                "HBHENoiseIsoFilter",
                "EcalDeadCellTriggerPrimitiveFilter",
                "BadPFMuonFilter",
                "eeBadScFilter",
            ],
            2017: [
                "goodVertices",
                "globalSuperTightHalo2016Filter",
                "HBHENoiseFilter",
                "HBHENoiseIsoFilter",
                "EcalDeadCellTriggerPrimitiveFilter",
                "BadPFMuonFilter",
                # "BadChargedCandidateFilter",
                "eeBadScFilter",
                "ecalBadCalibFilter",
            ],
            2018:  [
                "goodVertices",
                "globalSuperTightHalo2016Filter",
                "HBHENoiseFilter",
                "HBHENoiseIsoFilter",
                "EcalDeadCellTriggerPrimitiveFilter",
                "BadPFMuonFilter",
                # "BadChargedCandidateFilter",
                "eeBadScFilter",
                "ecalBadCalibFilter",
            ],
        }[int(self._year)]

        # https://twiki.cern.ch/twiki/bin/viewauth/CMS/BtagRecommendation
        self._btagWPs = {
            '2016preVFP': {
                'loose': 0.0508,
                'medium': 0.2598,
                'tight': 0.6502,
            },
            '2016postVFP': {
                'loose': 0.0480,
                'medium': 0.2489,
                'tight': 0.6377,
            },
            '2017': {
                'loose': 0.0532,
                'medium': 0.3040,
                'tight': 0.7476,
            },
            '2018': {
                'loose': 0.0490,
                'medium': 0.2783,
                'tight': 0.7100,
            },
        }[year + yearmod]

        self.selections = {}
        self.cutflows = {}

    @property
    def accumulator(self):
        return self._accumulator

    def save_dfs_parquet(self, fname, dfs_dict, ch, folder_name):
        if self._output_location is not None:
            table = pa.Table.from_pandas(dfs_dict)
            pq.write_table(table, './outfiles/' + ch + folder_name + '/parquet/' + fname + '.parquet')

    def ak_to_pandas(self, output_collection: ak.Array) -> pd.DataFrame:
        output = pd.DataFrame()
        for field in ak.fields(output_collection):
            output[field] = ak.to_numpy(output_collection[field])
        return output

    def add_selection(self, name: str, sel: np.ndarray, channel: list = None):
        """Adds selection to PackedSelection object and the cutflow dictionary"""
        channels = channel if channel else self._channels
        for ch in channels:
            self.selections[ch].add(name, sel)
            self.cutflows[ch][name] = np.sum(self.selections[ch].all(*self.selections[ch].names))

    def process(self, events: ak.Array):
        """Returns skimmed events which pass preselection cuts and with the branches listed in self._skimvars"""
        dataset = events.metadata['dataset']
        isMC = hasattr(events, "genWeight")
        sumgenweight = ak.sum(events.genWeight) if isMC else 0
        nevents = len(events)

        # empty selections and cutflows
        self.selections = {}
        self.cutflows = {}
        for ch in self._channels:
            self.selections[ch] = PackedSelection()
            self.cutflows[ch] = {}
            self.cutflows[ch]["all"] = nevents

        # trigger
        triggers = {}
        for ch in self._channels:
            if ch == "had" and isMC:
                trigger = np.ones(nevents, dtype='bool')
            else:
                # apply trigger to both data and MC (except for hadronic channel)
                trigger = np.zeros(len(events), dtype='bool')
                for t in self._HLTs[ch]:
                    if t in events.HLT.fields:
                        trigger = trigger | events.HLT[t]
            self.add_selection("trigger", trigger, [ch])
            del trigger

        # metfilters
        metfilters = np.ones(nevents, dtype='bool')
        for mf in self._metfilters:
            if mf in events.Flag.fields:
                metfilters = metfilters & events.Flag[mf]
        self.add_selection("metfilters", metfilters)

        # define muon objects
        loose_muons = (
            (((events.Muon.pt > 30) & (events.Muon.pfRelIso04_all < 0.25)) |
             (events.Muon.pt > 55))
            & (np.abs(events.Muon.eta) < 2.4)
            & (events.Muon.looseId)
        )
        n_loose_muons = ak.sum(loose_muons, axis=1)

        good_muons = (
            (events.Muon.pt > 28)
            & (np.abs(events.Muon.eta) < 2.4)
            & (np.abs(events.Muon.dz) < 0.1)
            & (np.abs(events.Muon.dxy) < 0.05)
            & (events.Muon.sip3d <= 4.0)
            & events.Muon.mediumId
        )
        n_good_muons = ak.sum(good_muons, axis=1)

        # define electron objects
        loose_electrons = (
            (((events.Electron.pt > 38) & (events.Electron.pfRelIso03_all < 0.25)) |
             (events.Electron.pt > 120))
            & ((np.abs(events.Electron.eta) < 1.44) | (np.abs(events.Electron.eta) > 1.57))
            & (events.Electron.cutBased >= events.Electron.LOOSE)
        )
        n_loose_electrons = ak.sum(loose_electrons, axis=1)

        good_electrons = (
            (events.Electron.pt > 38)
            & ((np.abs(events.Electron.eta) < 1.44) | (np.abs(events.Electron.eta) > 1.57))
            & (np.abs(events.Electron.dz) < 0.1)
            & (np.abs(events.Electron.dxy) < 0.05)
            & (events.Electron.sip3d <= 4.0)
            & (events.Electron.mvaFall17V2noIso_WP90)
        )
        n_good_electrons = ak.sum(good_electrons, axis=1)

        # leading lepton
        goodleptons = ak.concatenate([events.Muon[good_muons], events.Electron[good_electrons]], axis=1)
        goodleptons = goodleptons[ak.argsort(goodleptons.pt, ascending=False)]
        candidatelep = ak.firsts(goodleptons)

        # candidate leptons
        candidatelep_p4 = ak.zip(
            {
                "pt": candidatelep.pt,
                "eta": candidatelep.eta,
                "phi": candidatelep.phi,
                "mass": candidatelep.mass,
                "charge": candidatelep.charge,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        # relative isolation
        lep_reliso = candidatelep.pfRelIso04_all if hasattr(candidatelep, "pfRelIso04_all") else candidatelep.pfRelIso03_all
        # mini isolation
        mu_miso = candidatelep.miniPFRelIso_all
        # MVA-ID
        mu_mvaId = candidatelep.mvaId if hasattr(candidatelep, "mvaId") else np.zeros(nevents)

        # JETS
        goodjets = events.Jet[
            (events.Jet.pt > 30)
            & (abs(events.Jet.eta) < 2.5)
            & events.Jet.isTight
        ]
        ht = ak.sum(goodjets.pt, axis=1)

        # FATJETS
        fatjets = events.FatJet
        fatjets["qcdrho"] = 2 * np.log(fatjets.msoftdrop / fatjets.pt)

        good_fatjets = (
            (fatjets.pt > 200)
            & (abs(fatjets.eta) < 2.5)
            & fatjets.isTight
            # & fatjets.puId==7   #### TODO field not found
        )
        n_fatjets = ak.sum(good_fatjets, axis=1)

        good_fatjets = fatjets[good_fatjets]
        good_fatjets = good_fatjets[ak.argsort(good_fatjets.pt, ascending=False)]
        leadingfj = ak.firsts(good_fatjets)
        secondfj = ak.pad_none(good_fatjets, 2, axis=1)[:, 1]

        candidatefj_lep = ak.firsts(good_fatjets[ak.argmin(good_fatjets.delta_r(candidatelep_p4), axis=1, keepdims=True)])
        # lepton and fatjet mass
        lep_fj_m = (candidatefj_lep - candidatelep_p4).mass

        dphi_jet_lepfj = abs(goodjets.delta_phi(candidatefj_lep))  # ele and mu
        dphi_jet_leadingfj = abs(goodjets.delta_phi(leadingfj))  # had

        bjets_ophem_lepfj = ak.max(goodjets[dphi_jet_lepfj > np.pi / 2].btagDeepFlavB, axis=1)  # in event, pick highest b score in opposite direction from signal
        bjets_ophem_leadingfj = ak.max(goodjets[dphi_jet_leadingfj > np.pi / 2].btagDeepFlavB, axis=1)

        # deltaR
        dr_jet_candlep = candidatefj_lep.delta_r(candidatelep_p4)

        # MET
        met = events.MET
        mt_lep_met = np.sqrt(
            2. * candidatelep_p4.pt * met.pt * (ak.ones_like(met.pt) - np.cos(candidatelep_p4.delta_phi(met)))
        )

        # event selections
        self.add_selection(
            name='leptonKin',
            sel=(candidatelep.pt > 30),
            channel=['mu']
        )
        self.add_selection(
            name='oneLepton',
            sel=(n_good_muons == 1) & (n_good_electrons == 0) & (n_loose_electrons == 0),
            channel=['mu']
        )
        self.add_selection('leptonIsolation', sel=(
            ((candidatelep.pt > 30)
             & (candidatelep.pt < 55)
             & (lep_reliso < 0.25)
             )
            | ((candidatelep.pt >= 55)
               & (candidatelep.miniPFRelIso_all < 0.2))
        ), channel=['mu'])
        self.add_selection('leptonInJet', sel=(dr_jet_candlep < 0.8), channel=['mu', 'ele'])
        self.add_selection('ht', sel=(ht > 200), channel=['mu', 'ele'])
        self.add_selection('mt', sel=(mt_lep_met < 100), channel=['mu', 'ele'])
        # self.add_selection(
        #     name='bjet_tag',
        #     sel=(bjets_ophem_lepfj > self._btagWPs["medium"]),
        #     channel=['mu', 'ele']
        # )
        # selections for electrons
        self.add_selection(
            name='leptonKin',
            sel=(candidatelep.pt > 40),
            channel=['ele']
        )
        self.add_selection(
            name='oneLepton',
            sel=(n_good_muons == 0) & (n_loose_muons == 0) & (n_good_electrons == 1),
            channel=['ele']
        )
        self.add_selection('leptonIsolation', sel=(
            ((candidatelep.pt > 30)
             & (candidatelep.pt < 120)
             & (lep_reliso < 0.3)
             )
            | ((candidatelep.pt >= 120)
               & (candidatelep.miniPFRelIso_all < 0.2))
        ), channel=['ele'])

        # had selection
        self.add_selection(
            name='oneFatjet',
            sel=(n_fatjets >= 1) & (n_good_muons == 0) & (n_loose_muons == 0) & (n_good_electrons == 0) & (n_loose_electrons == 0),
            channel=['had']
        )
        self.add_selection(
            name='leadingJet',
            sel=leadingfj.pt > 450,
            channel=['had']
        )
        self.add_selection(
            name='softdrop',
            sel=leadingfj.msoftdrop > 30,
            channel=['had']
        )
        self.add_selection(
            name='qcdrho',
            sel=(leadingfj.qcdrho > -7) & (leadingfj.qcdrho < -2.0),
            channel=['had']
        )
#         self.add_selection(
#             name='bjet_tag',
#             sel=(bjets_ophem_leadingfj > self._btagWPs["medium"]),
#             channel=['had']
#         )

        # initialize pandas dataframe
        output = {}
        for ch in self._channels:
            out = {}
            for var in self._skimvars[ch]:
                if var == "lepton_pt":
                    value = pad_val(candidatelep.pt, -1)
                    out[var] = value
                if var == "dr_jet_candlep":
                    value = pad_val(dr_jet_candlep, -1)
                    out[var] = value
                if var == "mt_lep_met":
                    value = pad_val(mt_lep_met, -1)
                    out[var] = value
                if var == "ht":
                    value = pad_val(ht, -1)
                    out[var] = value
                if var == "met":
                    value = pad_val(met.pt, -1)
                    out[var] = value
                if var == "lep_isolation":
                    value = pad_val(lep_reliso, -1)
                    out[var] = value
                if var == "lepfj_m":
                    value = pad_val(lep_fj_m, -1)
                    out[var] = value
                if var == "candidatefj_lep_pt":
                    value = pad_val(candidatefj_lep.pt, -1)
                    out[var] = value
                if var == "leadingfj_pt":
                    value = pad_val(leadingfj.pt, -1)
                    out[var] = value
                if var == "leadingfj_msoftdrop":
                    value = pad_val(leadingfj.msoftdrop, -1)
                    out[var] = value
                if var == "secondfj_pt":
                    value = pad_val(secondfj.pt, -1)
                    out[var] = value
                if var == "secondfj_msoftdrop":
                    value = pad_val(secondfj.msoftdrop, -1)
                    out[var] = value
                if var == "bjets_ophem_lepfj":
                    value = pad_val(bjets_ophem_lepfj, -1)
                    out[var] = value
                if var == "bjets_ophem_leadingfj":
                    value = pad_val(bjets_ophem_leadingfj, -1)
                    out[var] = value
                else:
                    continue

            # print arrays and selections to debug
            # print(out)
            # print(selections[ch].all(*selections[ch].names))

            # apply selections
            if np.sum(self.selections[ch].all(*self.selections[ch].names)) > 0:
                output[ch] = {
                    key: value[self.selections[ch].all(*self.selections[ch].names)] for (key, value) in out.items()
                }
            else:
                output[ch] = {}

            # convert arrays to pandas
            if not isinstance(output[ch], pd.DataFrame):
                output[ch] = self.ak_to_pandas(output[ch])

        # now save pandas dataframes
        fname = events.behavior["__events_factory__"]._partition_key.replace("/", "_")
        fname = 'condor_' + fname
        for ch in self._channels:
            if not os.path.exists('./outfiles/' + ch):  # creating a directory for each channel
                os.makedirs('./outfiles/' + ch)
            if not os.path.exists('./outfiles/' + ch + self.folder_name + '/parquet'):  # creating a directory for each channel
                os.makedirs('./outfiles/' + ch + self.folder_name + '/parquet')

            self.save_dfs_parquet(fname, output[ch], ch, self.folder_name)

        # return dictionary with cutflows
        return {
            dataset: {'mc': isMC,
                      self._year: {'sumgenweight': sumgenweight,
                                   'cutflows': self.cutflows}
                      }
        }

    def postprocess(self, accumulator):
        return accumulator

In [149]:
import json
datasets = {"GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8": "HWW",
           } 

sample = "GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8"

fileset = {}
for dataset_name,dataset in datasets.items():
    print(dataset_name)
    with open("../data/fileset_2017_UL_NANO.json", 'r') as f:
        files = json.load(f)[dataset_name]
    
    # files = [files[0], files[1]]
    
    # use all_files False if you want to test
    all_files = False
    # need to define the fileset but call them with xcache
    if all_files:
        fileset[dataset_name] = ["root://xcache/"+ f for f in files]
    else:
        fileset[dataset_name] = [["root://xcache/"+ f for f in files][1]]

GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8


In [150]:
# import uproot
# sample = "GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8"
# for i in range(len(fileset[sample])):
#     f = uproot.open(fileset[sample][i])
#     num = f['Events'].num_entries   ### checks number of events per file        
#     print('number of events in file', i, 'is', num)

number of events in file 0 is 98800


In [151]:
import uproot
uproot.open.defaults['xrootd_handler'] = uproot.source.xrootd.MultithreadedXRootDSource

from coffea.processor import IterativeExecutor,Runner,DaskExecutor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema

dask_executor = False

# remove outfiles directory cand create a new empty one
if os.path.exists('./outfiles'):
    shutil.rmtree('./outfiles') 
os.makedirs('./outfiles')

channels = ["ele", "mu", "had"]
job_name = '/' + str(0) + '-' + str(1)

# define executor 
if dask_executor:
    executor = DaskExecutor(compression=1, status=True, client=client, treereduction=2)
else:
    executor = IterativeExecutor(compression=1, status=True)

# define the runner (Same as before)
run = Runner(executor=executor,savemetrics=True,chunksize=10000,schema=NanoAODSchema)

# run
for dataset,dataset_files in fileset.items():
    new_fileset = {dataset: dataset_files}
    print(dataset)
    hwwproc = HwwProcessor(year="2017", channels=channels, output_location="./", folder_name = job_name)
    out,metrics = run(new_fileset,'Events',processor_instance=hwwproc)

# save pkl file with metadata
filehandler = open(f"outfiles/{0}-{1}.pkl", "wb")
pkl.dump(out, filehandler)
filehandler.close()
    
### merge parquet
for ch in channels:
    data = pd.read_parquet('./outfiles/' + ch + job_name + '/parquet')
    data.to_parquet('./outfiles/' + job_name + '_' + ch + '.parquet')

GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8


Processing:   0%|          | 0/10 [00:00<?, ?chunk/s]

  result = getattr(ufunc, method)(
  result = getattr(ufunc, method)(
  result = getattr(ufunc, method)(


In [154]:
# load parquet data
import pandas as pd
data = pq.read_table('./outfiles/0-1_had.parquet')
data = data.to_pandas()
data

Unnamed: 0,leadingfj_pt,leadingfj_msoftdrop,secondfj_pt,secondfj_msoftdrop,met,ht,bjets_ophem_leadingfj
0,468.0,73.6875,388.25,2.519531,49.729961,862.5,0.065857
1,522.0,144.125,361.25,113.8125,50.110233,1004.34375,0.051697
2,473.5,86.5625,319.5,-1.0,24.722195,992.125,0.592773
3,696.0,99.4375,472.0,92.375,54.058056,1786.53125,0.691895
4,518.5,137.25,317.25,9.507812,147.868591,1146.46875,0.029587
5,856.5,253.25,386.0,107.6875,133.083298,1621.46875,0.167603
6,454.75,120.0625,246.625,88.5625,169.198975,820.53125,0.020905
7,457.0,99.6875,360.5,7.398438,82.306458,1045.4375,0.042969
8,692.0,199.0,307.75,5.527344,430.921265,996.5,0.018555
9,450.75,47.125,-1.0,-1.0,336.519623,663.15625,0.0215


In [155]:
# load pickle metadata
with open('./outfiles/0-1.pkl', 'rb') as f:
    metadata = pkl.load(f)
metadata

{'GluGluHToWWToLNuQQ_M125_TuneCP5_PSweight_13TeV-powheg2-jhugen727-pythia8': {'mc': 10,
  '2017': {'sumgenweight': 2854455.5,
   'cutflows': {'ele': {'leptonKin': 73,
     'leptonInJet': 103,
     'ht': 103,
     'all': 98800,
     'trigger': 5742,
     'mt': 76,
     'metfilters': 5742,
     'leptonIsolation': 72,
     'oneLepton': 72},
    'mu': {'leptonKin': 8970,
     'leptonInJet': 172,
     'ht': 172,
     'all': 98800,
     'trigger': 10794,
     'leptonIsolation': 8896,
     'metfilters': 10789,
     'mt': 140,
     'oneLepton': 8967},
    'had': {'qcdrho': 18,
     'oneFatjet': 802,
     'all': 98800,
     'trigger': 98800,
     'metfilters': 98760,
     'leadingJet': 37,
     'softdrop': 20}}}}}