In [2]:
from lpcjobqueue import LPCCondorCluster
from distributed import Client
from coffea import processor
import awkward as ak
import time

In [3]:
def printTime(delta): 
    print('\nTime: {0:0.0f}m {1:0.2f}s'.format(delta//60, delta % 60))

In [5]:
cluster = LPCCondorCluster()
cluster.adapt(minimum=5, maximum=100)
client = Client(cluster)

In [6]:
fileset = {}

with open('filenames.txt') as file:
    fileset["noPU"] = ['root://cmsxrootd.fnal.gov/' + f.split()[0] for f in file]

with open('filenames_epsilonPU.txt') as file:
    fileset["epsilonPU"] = ['root://cmsxrootd.fnal.gov/' + f.split()[0] for f in file]

In [7]:
class Flattener(processor.ProcessorABC):
    def process(self, events):
        items = {
            "evtid": events.run * 2**32 + events.event,
            "nPU": events.Pileup.nPU,
        }
        j1 = ak.firsts(events.Jet)
        fields = ["pt", "eta", "phi", "chEmEF", "neEmEF"]
        for field in fields:
            items[f"j1_{field}"] = j1[field]
        return ak.to_pandas(ak.zip(items))

    def postprocess(self, events):
        pass
    
flatten = Flattener()

In [8]:
# test our processor

from coffea.nanoevents import NanoEventsFactory
events = NanoEventsFactory.from_root(fileset["epsilonPU"][0], entry_stop=1000).events()
flatten.process(events)



Unnamed: 0_level_0,evtid,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF
entry,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,4297122309,0,27.234375,-2.078613,-1.585449,0.0,0.036133
1,4297122310,0,14.328125,0.041298,-1.967285,0.0,0.277344
2,4297122300,0,47.312500,0.343628,2.836914,0.0,0.216797
3,4297122319,0,23.953125,-2.691895,0.553223,0.0,0.000000
4,4297122305,0,189.000000,0.638428,-1.989014,0.0,0.132812
...,...,...,...,...,...,...,...
995,4297123273,0,3292.000000,-0.555542,3.046875,0.0,0.441406
996,4297123295,0,3082.000000,0.537964,-0.157349,0.0,0.197266
997,4297123294,0,1600.000000,0.316650,1.446045,0.0,0.410156
998,4297123290,0,993.500000,-1.300293,2.808105,0.0,0.531250


In [18]:
# test our processor

from coffea.nanoevents import NanoEventsFactory
events = NanoEventsFactory.from_root(fileset["noPU"][0], entry_stop=1000).events()
flatten.process(events)



Unnamed: 0_level_0,evtid,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF
entry,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,4308888907,19,33.71875,-1.155518,2.829102,0.0,0.394531
1,4308888902,12,374.50000,-0.770630,-0.638062,0.0,0.273438
2,4308888911,38,44.12500,2.192383,1.757812,0.0,0.007385
3,4308888900,16,302.75000,2.520996,-0.931030,0.0,0.347656
4,4308888901,39,45.50000,-0.555176,1.281982,0.0,0.132812
...,...,...,...,...,...,...,...
995,4295771285,34,1870.00000,-0.932129,1.099121,0.0,0.632812
996,4295771291,28,5080.00000,-0.211670,-0.640747,0.0,0.414062
997,4295771296,55,960.00000,0.469299,1.122803,0.0,0.386719
998,4295771276,46,2988.00000,-0.342163,-0.393799,0.0,0.500000


In [9]:
# make one distributed dataframe per dataset

toc = time.time()

ddf = {
    ds: processor.run_uproot_job(
        {ds: fileset[ds]},
        "Events",
        flatten,
        processor.dask_executor,
        {
            "client": client,
            "schema": processor.NanoAODSchema,
            "use_dataframes": True,
        }
    )
    for ds in fileset
}

tic = time.time()

printTime(tic-toc)

[########################################] | 100% Completed |  1min 36.7s
Time: 4m 9.87s


In [10]:
# join the datasets together

ddf = (
    ddf["noPU"].set_index("evtid")
    .join(
        ddf["epsilonPU"].set_index("evtid"),
        rsuffix="_PU"
    )
    .persist()
)

In [11]:
ddf

Unnamed: 0_level_0,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF,nPU_PU,j1_pt_PU,j1_eta_PU,j1_phi_PU,j1_chEmEF_PU,j1_neEmEF_PU
npartitions=393,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
4294967297,int32,float32,float32,float32,float32,float32,int32,float32,float32,float32,float32,float32
4295055997,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
4314873400,...,...,...,...,...,...,...,...,...,...,...,...
4314967296,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
# plot something from the distributed dataframe

import hist
from dask import delayed

@delayed
def corr(part):
    h = hist.Hist.new.Reg(20, 0, 200, name="pt").Reg(20, 0,200, name="pt_epsilonPU").Double()
    h.fill(part["j1_pt"], part["j1_pt_PU"])
    return h


h = client.compute(sum(map(corr, ddf.to_delayed()))).result()
h

In [19]:
# try to bring the result back (it might be really big)

df = ddf.compute()
df

Unnamed: 0_level_0,nPU,j1_pt,j1_eta,j1_phi,j1_chEmEF,j1_neEmEF,nPU_PU,j1_pt_PU,j1_eta_PU,j1_phi_PU,j1_chEmEF_PU,j1_neEmEF_PU
evtid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
4294967297,54,4888.00000,-0.274353,-0.717773,0.0,0.578125,0.0,4856.00000,-0.018986,2.412109,0.000000,0.382812
4294967298,18,998.50000,0.035248,1.044434,0.0,0.378906,0.0,978.50000,0.037758,1.044189,0.000000,0.375000
4294967299,19,2882.00000,0.591309,-2.294922,0.0,0.546875,0.0,2944.00000,0.590332,-2.292969,0.000000,0.476562
4294967300,29,1144.00000,0.745239,0.055252,0.0,0.378906,0.0,1118.00000,-0.662842,-3.052734,0.000000,0.363281
4294967301,31,32.34375,-1.457520,2.986328,0.0,0.191406,0.0,33.40625,-1.636475,3.068848,0.222656,0.167969
...,...,...,...,...,...,...,...,...,...,...,...,...
4314967292,34,65.93750,1.277100,-0.350098,0.0,0.185547,0.0,63.96875,1.275879,-0.352295,0.000000,0.132812
4314967293,9,546.00000,-1.759521,3.046387,0.0,0.081055,0.0,542.00000,-1.760986,3.040039,0.000000,0.070312
4314967294,23,182.50000,1.325439,0.077850,0.0,0.070312,0.0,167.75000,1.335693,0.071594,0.000000,0.082031
4314967295,36,4536.00000,-0.197327,-1.891602,0.0,0.648438,0.0,4576.00000,-0.196198,-1.891357,0.000000,0.632812


In [20]:
df.to_parquet("jets-joined.parquet")

In [21]:
# cleanup

client.restart()
cluster.scale(0)