In [1]:
import psutil
import pytest
import os
import glob

from coffea import hist
from coffea.analysis_objects import JaggedCandidateArray
import coffea.processor as processor

from dask.distributed import Client, LocalCluster
from dask_jobqueue import HTCondorCluster

In [2]:
fileset = {
     'Jets': {'files': ['root://eospublic.cern.ch//eos/root-eos/benchmark/Run2012B_SingleMu.root'],
              'treename': 'Events'
             }
}

In [3]:
class METProcessor(processor.ProcessorABC):
    def __init__(self):
        self._columns = ['MET_pt']
        dataset = hist.Cat("dataset", "")
        MET_axis = hist.Bin("MET", "MET [GeV]", 50, 0, 100)
        self._accumulator = processor.dict_accumulator({
            'MET': hist.Hist("Counts", dataset, MET_axis),
            'cutflow': processor.defaultdict_accumulator(int)
        })

    @property
    def accumulator(self):
        return self._accumulator

    @property
    def columns(self):
        return self._columns

    def process(self, df):
        output = self.accumulator.identity()
        dataset = df['dataset']
        MET = df['MET_pt']
        output['cutflow']['all events'] += MET.size
        output['cutflow']['number of chunks'] += 1
        output['MET'].fill(dataset=dataset, MET=MET.flatten())
        return output

    def postprocess(self, accumulator):
        return accumulator

In [4]:
client = Client(processes=True, dashboard_address=None)
cachestrategy = 'dask-worker'
exe_args = {
        'client': client,
        'nano': True,
        'cachestrategy': cachestrategy,
        'savemetrics': True,
        'worker_affinity': True if cachestrategy is not None else False,
    }

output = processor.run_uproot_job(fileset,
                                treename = 'Events',
                                processor_instance = METProcessor(),
                                executor = processor.dask_executor,
                                executor_args = exe_args
                                )

# output = processor.run_uproot_job(fileset,
#                                  treename='Events',
#                                  processor_instance=METProcessor(),
#                                  executor=processor.futures_executor,
#                                  executor_args={'workers':4},
#                                  chunksize = 5000000)

[                                        ] | 0% Completed |  4.9s9s

ValueError: no column named 'dataset'

In [None]:
hist.plot1d(output['MET'], overlay='dataset', fill_opts={'edgecolor': (0,0,0,0.3), 'alpha': 0.8})

In [None]:
for key, value in output['cutflow'].items():
    print(key, value)