# Test Coffea with a DASK executor

This will test Coffea to see if we can figure out how to use it with our code, but running with a local DASK executor.
This doesn't make sense for this query - there is only one file, but it will test the pipeline!

First are the includes from coffea. This is based on the [example written by Ben](https://github.com/CoffeaTeam/coffea/blob/master/binder/servicex/ATLAS/LocalExample.ipynb).

In [1]:
from servicex import ServiceXDataset
from coffea.processor.servicex import DataSource, Analysis
from coffea.processor.servicex import DaskExecutor 

import matplotlib.pyplot as plt

from coffea import hist, processor
from IPython.display import display, update_display, HTML

And imports connected with running servicex.

In [2]:
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
from hist import Hist
import mplhep as mpl
import awkward as ak

from utils import files

Methods copied to help us get all leptons from the source files

In [3]:
def apply_event_cuts (source: ObjectStream) -> ObjectStream:
    '''Event level cuts for the analysis. Keep from sending data that we aren't going to need at all in the end.
    '''
    return (source
        .Where(lambda e: e.trigE or e.trigM))

def good_leptons(source: ObjectStream) -> ObjectStream:
    '''Select out all good leptons from each event. Return their pt, eta, phi, and E, and other
    things needed downstream.

    Because uproot doesn't tie toegher the objects, we can't do any cuts at this point.
    '''
    return source.Select(lambda e:
        {
            'lep_pt': e.lep_pt,
            'lep_eta': e.lep_eta,
            'lep_phi': e.lep_phi,
            'lep_energy': e.lep_E,
            'lep_charge': e.lep_charge,
            'lep_ptcone30': e.lep_ptcone30,
            'lep_etcone20': e.lep_etcone20,
            'lep_type': e.lep_type,
            'lep_trackd0pvunbiased': e.lep_trackd0pvunbiased,
            'lep_tracksigd0pvunbiased': e.lep_tracksigd0pvunbiased,
            'lep_z0': e.lep_z0,
        })

Create the `func_adl` cuts to get the data. The dataset we use here doesn't matter, as long as it "looks" like all the datasets we are going to be processing.

In [4]:
ds = ServiceXSourceUpROOT('cernopendata://dummy',  files['ggH125_ZZ4lep']['treename'], backend_name='dev_uproot')
ds.return_qastle = True
leptons = good_leptons(apply_event_cuts(ds))

The analysis code that will apply the 4 lepton cuts and make the 4 lepton mass plot.

In [5]:
class ATLAS_Higgs_4L(Analysis):
    @staticmethod
    def process(events):
        import awkward as ak
        from collections import defaultdict

        sumw = defaultdict(float)
        mass_hist = hist.Hist(
            "Events",
            hist.Cat("dataset", "Dataset"),
            hist.Bin("mass", "$Z_{ee}$ [GeV]", 60, 60, 120),
        )

        dataset = events.metadata['dataset']
        leptons = events.lep

        # We need to look at 4 lepton events only.
        cut = (ak.num(leptons) == 4)

        # Form the invar mass, plot.
        # diele = electrons[cut][:, 0] + electrons[cut][:, 1]
        # diele.mass
        dilepton = leptons[:,0] + leptons[:,1]
        mass_4l = leptons.mass

        # Fill the histogram
        sumw[dataset] += len(events)
        print(len(events))
        mass_hist.fill(
            dataset=dataset,
            mass=ak.flatten(mass_4l),
        )
        
        return {
            "sumw": sumw,
            "mass": mass_hist
        }

Create the data source that we will be running against.

In [6]:
def make_ds(name: str, query: ObjectStream):
    '''Create a ServiceX Datasource for a particular ATLAS Open data file
    '''
    datasets = [ServiceXDataset(files[name]['files'], backend_name='dev_uproot')]
    return DataSource(query=query, metadata={'dataset': name}, datasets=datasets)

And run!

In [7]:
analysis = ATLAS_Higgs_4L()
# TODO: It would be good if datatype was determined automagically (there is enough info)
executor = DaskExecutor('localhost:8786')
datasource = make_ds('ggH125_ZZ4lep', leptons)

async def run_updates_stream(accumulator_stream):
  global first

  count = 0
  async for coffea_info in accumulator_stream:
    count += 1
    print(count, coffea_info)
  return coffea_info

# Why do I need run_updates_stream, why not just await on execute (which fails with async gen can't).
# Perhaps something from aiostream can help here?
result = await run_updates_stream(executor.execute(analysis, datasource))

[root://eospublic.ce...:   0%|          | 0/9000000000.0 [00:00]

Exception: Tried sending message after closing.  Status: closed
Message: {'op': 'update-graph-hlg', 'hlg': {'layers': [{'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'run_coffea_processor-5027d76ce086b8afd788b5bb47625a7e': {'function': b'\x80\x04\x95?\x00\x00\x00\x00\x00\x00\x00\x8c"coffea.processor.servicex.executor\x94\x8c\x14run_coffea_processor\x94\x93\x94.', 'args': b'\x80\x04]\x94.', 'kwargs': b'\x80\x04\x95\xb0\x06\x00\x00\x00\x00\x00\x00}\x94(\x8c\nevents_url\x94X\xcf\x01\x00\x00http://localhost:9000/fbb77aed-153c-4cc4-a749-3a82ed00e017/root%3A%3A%3Aeospublic.cern.ch%3A%3Aeos%3Aopendata%3Aatlas%3AOutreachDatasets%3A2020-01-22%3A4lep%3AMC%3Amc_345060.ggH125_ZZ4lep.4lep.root.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=miniouser%2F20210930%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20210930T130216Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=c6a3151f39d095e21ba01d15f3ea36cd4f6a7b2755e8c89393ddb6a0867ecd64\x94\x8c\ttree_name\x94N\x8c\tdata_type\x94\x8c\x07parquet\x94\x8c\tmeta_data\x94}\x94\x8c\x07dataset\x94\x8ct[root://eospublic.cern.ch//eos/opendata/atlas/OutreachDatasets/2020-01-22/4lep/MC/mc_345060.ggH125_ZZ4lep.4lep.root]\x94s\x8c\x04proc\x94\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\r\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\nK\x0bKCC\xc2d\x01d\x00l\x00}\x01d\x01d\x02l\x01m\x02}\x02\x01\x00|\x02t\x03\x83\x01}\x03t\x04\xa0\x05d\x03t\x04\xa0\x06d\x04d\x05\xa1\x02t\x04\xa0\x07d\x06d\x07d\x08d\x08d\t\xa1\x05\xa1\x03}\x04|\x00j\x08d\x04\x19\x00}\x05|\x00j\t}\x06|\x01\xa0\n|\x06\xa1\x01d\nk\x02}\x07|\x06d\x00d\x00\x85\x02d\x01f\x02\x19\x00|\x06d\x00d\x00\x85\x02d\x0bf\x02\x19\x00\x17\x00}\x08|\x06j\x0b}\t|\x03|\x05\x05\x00\x19\x00t\x0c|\x00\x83\x017\x00\x03\x00<\x00t\rt\x0c|\x00\x83\x01\x83\x01\x01\x00|\x04j\x0e|\x05|\x01\xa0\x0f|\t\xa1\x01d\x0c\x8d\x02\x01\x00|\x03|\x04d\r\x9c\x02S\x00\x94(NK\x00\x8c\x0bdefaultdict\x94\x85\x94\x8c\x06Events\x94h\x08\x8c\x07Dataset\x94\x8c\x04mass\x94\x8c\x0e$Z_{ee}$ [GeV]\x94K<KxK\x04K\x01h\x08h\x19\x86\x94\x8c\x04sumw\x94h\x19\x86\x94t\x94(\x8c\x07awkward\x94\x8c\x0bcollections\x94h\x15\x8c\x05float\x94\x8c\x04hist\x94\x8c\x04Hist\x94\x8c\x03Cat\x94\x8c\x03Bin\x94\x8c\x08metadata\x94\x8c\x03lep\x94\x8c\x03num\x94h\x19\x8c\x03len\x94\x8c\x05print\x94\x8c\x04fill\x94\x8c\x07flatten\x94t\x94(\x8c\x06events\x94\x8c\x02ak\x94h\x15h\x1c\x8c\tmass_hist\x94h\x08\x8c\x07leptons\x94\x8c\x03cut\x94\x8c\x08dilepton\x94\x8c\x07mass_4l\x94t\x94\x8c@C:\\Users\\gordo\\AppData\\Local\\Temp/ipykernel_454932/1137201104.py\x94\x8c\x07process\x94K\x02C,\x00\x02\x08\x01\x0c\x02\x08\x01\x04\x01\x02\x01\n\x01\x10\xfd\x04\x06\n\x01\x06\x03\x0e\x05 \x01\x06\x03\x14\x01\x0c\x01\x04\x01\x02\x01\x08\xfe\x06\x06\x02\x01\x02\xfe\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94uNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h@}\x94}\x94(h=h7\x8c\x0c__qualname__\x94\x8c\x16ATLAS_Higgs_4L.process\x94\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h>\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94h"h\x0b\x8c\tsubimport\x94\x93\x94\x8c\x0bcoffea.hist\x94\x85\x94R\x94su\x86\x94\x86R0u.'}}, 'dependencies': {'run_coffea_processor-5027d76ce086b8afd788b5bb47625a7e': set()}}, 'annotations': {}}]}, 'keys': ['run_coffea_processor-5027d76ce086b8afd788b5bb47625a7e'], 'priority': {'run_coffea_processor-5027d76ce086b8afd788b5bb47625a7e': 0}, 'submitting_task': None, 'fifo_timeout': '100 ms', 'actors': False, 'code': '    def run_async_analysis(\n        self,\n        file_url: str,\n        tree_name: Optional[str],\n        data_type: str,\n        meta_data: Dict[str, str],\n        process_func: Callable,\n    ):\n        """Create a dask future for a dask task to run the analysis."""\n        data_result = self.dask.submit(\n            run_coffea_processor,\n            events_url=file_url,\n            tree_name=tree_name,\n            data_type=data_type,\n            meta_data=meta_data,\n            proc=process_func,\n        )\n\n        return data_result\n'}

In [None]:
hist.plot1d(result['mass'])