In [1]:
from lpcjobqueue import LPCCondorCluster
from distributed import Client
from coffea import processor
import awkward as ak

In [2]:
cluster = LPCCondorCluster()
cluster.adapt(minimum=5, maximum=100)
client = Client(cluster)

In [3]:
fileset = {}

with open('filenames.txt') as file:
    fileset["normal"] = ['root://cmsxrootd.fnal.gov/' + f.split()[0] for f in file]

with open('filenames_flatPU.txt') as file:
    fileset["flatPU"] = ['root://cmsxrootd.fnal.gov/' + f.split()[0] for f in file]

with open('filenames_epsilonPU.txt') as file:
    fileset["epsilonPU"] = ['root://cmsxrootd.fnal.gov/' + f.split()[0] for f in file]

In [4]:
class Flattener(processor.ProcessorABC):
    def process(self, events):
        items = {
            "evtid": events.run * 2**32 + events.event,
            "nPU": events.Pileup.nPU,
        }
        j1 = ak.firsts(events.Jet)
        fields = ["pt", "eta", "phi", "chEmEF", "neEmEF"]
        for field in fields:
            items[f"j1_{field}"] = j1[field]
        return ak.to_pandas(ak.zip(items))

    def postprocess(self, events):
        pass
    
flatten = Flattener()

In [5]:
# test our processor

from coffea.nanoevents import NanoEventsFactory
events = NanoEventsFactory.from_root(fileset["flatPU"][0], entry_stop=1000).events()
flatten.process(events)



Unnamed: 0_level_0,evtid,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF
entry,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,4295156018,4,48.09375,-1.752686,1.723633,0.0,0.023926
1,4295155998,35,1253.00000,1.155518,0.430420,0.0,0.205078
2,4295156025,0,37.12500,-0.812256,-0.827759,0.0,0.123047
3,4295156021,24,17.71875,-4.099609,0.678711,0.0,0.072266
4,4295156011,21,23.87500,0.571899,1.616455,0.0,0.851562
...,...,...,...,...,...,...,...
995,4295156995,43,958.50000,0.426758,-1.470215,0.0,0.492188
996,4295156965,74,1757.00000,0.732910,1.620605,0.0,0.640625
997,4295156983,74,959.50000,1.239746,1.574219,0.0,0.300781
998,4295156972,75,4984.00000,0.214600,2.845703,0.0,0.449219


In [6]:
# make one distributed dataframe per dataset

ddf = {
    ds: processor.run_uproot_job(
        {ds: fileset[ds]},
        "Events",
        flatten,
        processor.dask_executor,
        {
            "client": client,
            "schema": processor.NanoAODSchema,
            "use_dataframes": True,
        }
    )
    for ds in fileset
}

[########################################] | 100% Completed | 59.9s 48.2s

In [7]:
# join the datasets together

ddf = (
    ddf["normal"].set_index("evtid")
    .join(
        ddf["flatPU"].set_index("evtid"),
        rsuffix="_flat"
    )
    .join(
        ddf["epsilonPU"].set_index("evtid"),
        rsuffix="_epsilon"
    )
    .persist()
)

In [8]:
ddf

Unnamed: 0_level_0,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF,nPU_flat,j1_pt_flat,j1_eta_flat,j1_phi_flat,j1_chEmEF_flat,j1_neEmEF_flat,nPU_epsilon,j1_pt_epsilon,j1_eta_epsilon,j1_phi_epsilon,j1_chEmEF_epsilon,j1_neEmEF_epsilon
npartitions=587,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
4294967297,int32,float32,float32,float32,float32,float32,int32,float32,float32,float32,float32,float32,int32,float32,float32,float32,float32,float32
4295057917,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4314883096,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4314967296,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [9]:
# plot something from the distributed dataframe

import hist
from dask import delayed

@delayed
def corr(part):
    h = hist.Hist.new.Reg(100, 0, 1000, name="pt").Reg(100, 0, 1000, name="pt_flatPU").Double()
    h.fill(part["j1_pt"], part["j1_pt_flat"])
    return h


h = client.compute(sum(map(corr, ddf.to_delayed()))).result()
h

distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 668, in log_errors
    yield
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5742, in retire_workers
    await self.replicate(
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5489, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - Exception while handling op retire_workers
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
    result = await result
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5742, in retire_workers
    await self.replicate(
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5489, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/opt/cond

In [10]:
# try to bring the result back (it might be really big)

df = ddf.compute()
df

Unnamed: 0_level_0,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF,nPU_flat,j1_pt_flat,j1_eta_flat,j1_phi_flat,j1_chEmEF_flat,j1_neEmEF_flat,nPU_epsilon,j1_pt_epsilon,j1_eta_epsilon,j1_phi_epsilon,j1_chEmEF_epsilon,j1_neEmEF_epsilon
evtid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
4294967297,54,4888.00000,-0.274353,-0.717773,0.0,0.578125,40.0,4864.000000,-0.275269,-0.719116,0.000000,0.578125,0.0,4856.00000,-0.018986,2.412109,0.000000,0.382812
4294967298,18,998.50000,0.035248,1.044434,0.0,0.378906,17.0,994.000000,0.035812,1.044189,0.000000,0.390625,0.0,978.50000,0.037758,1.044189,0.000000,0.375000
4294967299,19,2882.00000,0.591309,-2.294922,0.0,0.546875,12.0,2908.000000,0.590088,-2.295410,0.000000,0.546875,0.0,2944.00000,0.590332,-2.292969,0.000000,0.476562
4294967300,29,1144.00000,0.745239,0.055252,0.0,0.378906,70.0,1100.000000,-0.664062,-3.045898,0.000000,0.484375,0.0,1118.00000,-0.662842,-3.052734,0.000000,0.363281
4294967301,31,32.34375,-1.457520,2.986328,0.0,0.191406,52.0,29.078125,-1.640381,3.072754,0.222656,0.203125,0.0,33.40625,-1.636475,3.068848,0.222656,0.167969
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4314967292,34,65.93750,1.277100,-0.350098,0.0,0.185547,22.0,68.812500,1.276611,-0.352722,0.000000,0.132812,0.0,63.96875,1.275879,-0.352295,0.000000,0.132812
4314967293,9,546.00000,-1.759521,3.046387,0.0,0.081055,24.0,539.500000,-1.756592,3.039551,0.000000,0.053223,0.0,542.00000,-1.760986,3.040039,0.000000,0.070312
4314967294,23,182.50000,1.325439,0.077850,0.0,0.070312,73.0,193.750000,1.336182,0.082794,0.000000,0.083008,0.0,167.75000,1.335693,0.071594,0.000000,0.082031
4314967295,36,4536.00000,-0.197327,-1.891602,0.0,0.648438,61.0,4572.000000,-0.196381,-1.891602,0.000000,0.640625,0.0,4576.00000,-0.196198,-1.891357,0.000000,0.632812


In [12]:
df.to_parquet("jets-joined.parquet")

In [None]:
# cleanup

client.restart()
cluster.scale(0)