In [1]:
import awkward as ak
import numpy as np
import uproot
import vector
import hist

from coffea import processor
from coffea import nanoevents
from coffea.nanoevents.methods import candidate
from coffea.nanoevents   import NanoEventsFactory, NanoAODSchema, BaseSchema
from coffea.analysis_tools import Weights, PackedSelection

In [2]:
# fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/GluGluToContinToZZTo4mu_TuneCP5_13TeV-mcfm701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/140000/AAC14383-3B22-954C-A8B0-82E863CFF361.root'
# fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/GluGluToContinToZZTo4mu_TuneCP5_13TeV-mcfm701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/130000/B45C59D2-181F-624F-BEC7-222B47BBDC37.root'
# fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/GluGluToContinToZZTo4mu_TuneCP5_13TeV-mcfm701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/270000/C3E185C7-36E0-D74F-8416-F2B802353E0B.root'
fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/DYJetsToTauTau_M-50_AtLeastOneEorMuDecay_massWgtFix_TuneCP5_13TeV-powhegMiNNLO-pythia8-photos/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/230000/E29435DD-552A-5444-8AC5-2363451FB112.root'
# fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/ST_tW_top_5f_inclusiveDecays_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/280000/288232BC-D0BE-DB43-8E30-E9A02D9FF8DD.root'
fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv9/DYJetsToLL_0J_TuneCP5_13TeV-amcatnloFXFX-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/280000/171FDF40-2CCA-F746-9E13-6205ADE9D362.root'
events = NanoEventsFactory.from_root(
    fname,
    schemaclass=NanoAODSchema.v6,
    metadata={"dataset": "ZZ2l2nu"},
).events()

In [3]:
# fname = 'root://xcache//store/mc/RunIISummer20UL18NanoAODv2/GluGluToContinToZZTo4mu_TuneCP5_13TeV-mcfm701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/140000/AAC14383-3B22-954C-A8B0-82E863CFF361.root'
events = NanoEventsFactory.from_root(
    fname,
    schemaclass=NanoAODSchema.v7,
    metadata={"dataset": "ZZ2l2nu", "is_data": False},
).events()

In [4]:
class coffea_sumw(processor.ProcessorABC):
    def __init__(self):
        super().__init__()

    def process(self, event: processor.LazyDataFrame):
        dataset_name = event.metadata['dataset']
        is_mc = event.metadata.get("is_data")

        
        if is_mc is None:
            is_mc = 'data' not in dataset_name.lower()

        sumw = 1.0
        if is_mc:
            sumw = ak.sum(event.genEventSumw)
        else:
            sumw = -1.0

        return {dataset_name: sumw}

    def postprocess(self, accumulator):
        return accumulator

In [23]:
class fast_analysis(processor.ProcessorABC):
    def __init__(self):
        super().__init__()
        

    def process(self, event):
        a_npv = hist.axis.Regular(50, 0, 50, name="npv")
        h_npv = hist.Hist(a_npv, storage="weight", label="Counts")
        
        print(" --> ", event.fields)
        npv = event.PV.npvs
        print(npv)
        
        h_npv.fill(npv=npv,)
        return {'npv': h_npv}

    def postprocess(self, accumulator):
        pass

In [24]:
from dask.distributed import Client

client = Client("tls://yacine-2ehaddad-40cern-2ech.dask.cmsaf-prod.flatiron.hollandhpc.org:8786")
client

0,1
Connection method: Direct,
Dashboard: /user/yacine.haddad@cern.ch/proxy/8787/status,

0,1
Comm: tls://192.168.55.65:8786,Workers: 2
Dashboard: /user/yacine.haddad@cern.ch/proxy/8787/status,Total threads: 4
Started: 13 minutes ago,Total memory: 12.72 GiB

0,1
Comm: tls://red-c7125.unl.edu:46813,Total threads: 2
Dashboard: /user/yacine.haddad@cern.ch/proxy/44697/status,Memory: 5.72 GiB
Nanny: tls://172.19.0.2:34189,
Local directory: /var/lib/condor/execute/dir_3867843/dask-worker-space/worker-wfgqs4_p,Local directory: /var/lib/condor/execute/dir_3867843/dask-worker-space/worker-wfgqs4_p
Tasks executing: 0,Tasks in memory: 1
Tasks ready: 0,Tasks in flight: 0
CPU usage: 6.0%,Last seen: Just now
Memory usage: 421.02 MiB,Spilled bytes: 0 B
Read bytes: 330.32084634972307 B,Write bytes: 1.11 kiB

0,1
Comm: tls://yacine-2ehaddad-40cern-2ech.dask-worker.cmsaf-prod.flatiron.hollandhpc.org:8788,Total threads: 2
Dashboard: /user/yacine.haddad@cern.ch/proxy/40521/status,Memory: 7.00 GiB
Nanny: tls://192.168.55.65:38843,
Local directory: /home/cms-jovyan/dask-worker-space/worker-k5l_pryo,Local directory: /home/cms-jovyan/dask-worker-space/worker-k5l_pryo
Tasks executing: 0,Tasks in memory: 2
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 535.41 MiB,Spilled bytes: 0 B
Read bytes: 16.03 kiB,Write bytes: 21.35 kiB


In [25]:
base_runner = processor.Runner(
    executor = processor.DaskExecutor(client=client),
    schema = BaseSchema,
)

In [26]:
base_runner(
    {"hello": {"files":[fname]}}, 
    treename="Runs",
    processor_instance=coffea_sumw(),
)

[########################################] | 100% Completed |  0.1s

{'hello': 11314864802.080797}

In [27]:
# from qawa.process.zz2l2nu_vbs import zzinc_processor

In [28]:
pp = fast_analysis()

In [29]:
pp.process(events)

 -->  ['GenMET', 'Generator', 'fixedGridRhoFastjetCentral', 'OtherPV', 'IsoTrack', 'fixedGridRhoFastjetAll', 'genWeight', 'L1PreFiringWeight', 'DeepMETResolutionTune', 'SV', 'ChsMET', 'LHEPart', 'SubJet', 'SoftActivityJetNjets5', 'event', 'TkMET', 'GenVtx', 'GenVisTau', 'HLT', 'luminosityBlock', 'PSWeight', 'SubGenJetAK8', 'SoftActivityJetNjets2', 'GenDressedLepton', 'fixedGridRhoFastjetCentralCalo', 'fixedGridRhoFastjetCentralChargedPileUp', 'HLTriggerFirstPath', 'Pileup', 'SoftActivityJetHT10', 'genTtbarId', 'L1', 'GenJet', 'SoftActivityJetHT5', 'PV', 'LHEWeight', 'LHEScaleWeight', 'LHEReweightingWeight', 'run', 'LowPtElectron', 'FsrPhoton', 'SoftActivityJet', 'Muon', 'GenIsolatedPhoton', 'btagWeight', 'TrigObj', 'GenPart', 'HLTriggerFinalPath', 'DeepMETResponseTune', 'Flag', 'Electron', 'L1simulation', 'Photon', 'fixedGridRhoFastjetCentralNeutral', 'MET', 'SoftActivityJetHT2', 'LHE', 'Tau', 'CaloMET', 'L1Reco', 'GenJetAK8', 'SoftActivityJetHT', 'LHEPdfWeight', 'HTXS', 'SoftActivityJ

{'npv': Hist(Regular(50, 0, 50, name='npv', label='npv'), storage=Weight()) # Sum: WeightedSum(value=1.92475e+06, variance=1.92475e+06) (WeightedSum(value=1.93703e+06, variance=1.93703e+06) with flow)}

In [37]:
ana_runner = processor.Runner(
    #executor = processor.DaskExecutor(client=client),
    executor = processor.FuturesExecutor(compression=None, workers=4),
    schema = NanoAODSchema.v7,
    chunksize=100_000,
)

ana_runner(
    {"hello": {"files":[fname]}}, 
    treename="Events",
    processor_instance=pp,
)

Output()

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 1348, in automatic_retries
    raise e
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 1333, in automatic_retries
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 1586, in _work_function
    elif issubclass(schema, schemas.BaseSchema):
TypeError: issubclass() arg 1 must be a class
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 782, in _processwith
    merged = _watcher(FH, self, reducer, pool)
  File "/opt/conda/lib/python3.8/si

TypeError: issubclass() arg 1 must be a class