In [1]:
import numpy as np
import uproot
%matplotlib inline
from coffea import hist
#from coffea.analysis_objects import JaggedCandidateArray
import coffea.processor as processor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
import awkward as ak
import numpy as np
import cloudpickle
import gzip
import time
import os
import glob
from yahist import Hist1D, Hist2D       

In [2]:
pip install yahist

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Note: you may need to restart the kernel to use updated packages.


In [3]:
class Processor(processor.ProcessorABC):
    def __init__(self):
        dataset_axis = hist.Cat("dataset", "") 
        pt_axis = hist.Bin("pt", " [GeV]", [10., 15., 20., 25., 35., 50., 70., 90.])
        eta_mu_axis= hist.Bin("eta", "", [0., 1.2, 2.1, 2.4])
        eta_el_axis= hist.Bin("eta", "", [0., 0.8, 1.479, 2.5])
        
        self._accumulator = processor.dict_accumulator({
            'Muon': hist.Hist("Counts", dataset_axis, pt_axis, eta_mu_axis),
            'Tight_Muon': hist.Hist("Counts", dataset_axis, pt_axis, eta_mu_axis),
            'Electron': hist.Hist("Counts", dataset_axis, pt_axis, eta_el_axis),
            'Tight_Electron': hist.Hist("Counts", dataset_axis, pt_axis, eta_el_axis)

        })
    
    @property
    def accumulator(self):
        
        return self._accumulator
    
    def process(self, events):
        output = self.accumulator.identity()
        
        dataset = events.metadata["dataset"]
        
        # Skim Muon, Electron, and Jet
        Muon = events.Muon
        Muon = Muon[( Muon.pt >=10 ) &
                           (np.abs(Muon.eta) <= 2.4) &
                           (np.abs(Muon.dxy) <= 0.05) &
                           (np.abs(Muon.dz) <= 0.1) &
                           (np.abs(Muon.sip3d) < 4) &
                           (Muon.looseId==1) &
                           (Muon.ptErr/Muon.pt < 0.2) &
                           (Muon.mediumId==1) ]
        
        Electron = events.Electron
        Electron = Electron[( Electron.pt >10 ) &  #  (Electron.isTriggerSafeNolso) 
                           (np.abs(Electron.eta+Electron.deltaEtaSC ) < 2.4) &
                            (Electron.convVeto)&
                            (Electron.lostHits==0) &
                            (Electron.tightCharge==2) &
                            (np.abs(Electron.dz) < 0.1) &
                           (np.abs(Electron.dxy) < 0.05) &
                           (np.abs(Electron.sip3d) < 4) ]
        Jet = events.Jet
        Jet = Jet[( Jet.pt >25 ) &  (np.abs(Jet.eta ) < 2.4) ]

        
        def delta_phi(first, second):
            return np.arccos(np.cos(first.phi - second.phi))

        def delta_r2(first, second):
            return (first.eta - second.eta) ** 2 + delta_phi(first, second) ** 2

        def match(first, second, deltaRCut=0.4):
            drCut2 = deltaRCut**2
            combs = ak.cartesian([first, second], nested=True)
            return ak.any((delta_r2(combs['0'], combs['1'])<drCut2), axis=2)    
        
        def mt(pt1, phi1, pt2, phi2):    #transverse mass
            return np.sqrt( 2*pt1*pt2 * (1 - np.cos(phi1-phi2)) )
        
        # Definition of Tight/Loose Muon and Electron
        Tight_Muon = Muon[( Muon.pt >20 ) &
                          (np.abs(Muon.eta) < 2.4) &
                         (Muon.tightId==1) &
                          (Muon.miniPFRelIso_all<0.16)
                         ]
        Tight_Electron = Electron[( Electron.pt >20 )&
                          (np.abs(Electron.eta) < 2.4) &
                       #  (Electron.tightId==1) &
                          (Electron.miniPFRelIso_all<0.12)
                         ]
        
        Loose_Muon = Muon[( Muon.pt >20 )&
                          (np.abs(Muon.eta) < 2.4) &
                         (Muon.looseId==1) &
                          (Muon.miniPFRelIso_all<0.4)
                         ]        
        
        Loose_Electron = Electron[( Electron.pt >20 )&
                          (np.abs(Electron.eta) < 2.4) &
                        # (Electron.looseId==1) &
                          (Electron.miniPFRelIso_all<0.4)
                         ]
        MET = events.MET   
        

        control_region_Muon = events[( ak.num(Loose_Muon)==1) &(ak.num(Loose_Electron)==0)
                                    & (ak.num(Jet[~match(Jet, Muon, deltaRCut=1.0)])>=1) & ( MET.pt<20) 
                                    & ak.any(mt(Muon.pt,Muon.phi, MET.pt, MET.phi)<20, axis=1)]

        
        tight_muon= control_region_Muon.Muon[(control_region_Muon.Muon.pt >20 ) &
                          (np.abs(control_region_Muon.Muon.eta) < 2.4) &
                         (control_region_Muon.Muon.tightId==1) &
                          (control_region_Muon.Muon.miniPFRelIso_all<0.16)
                         ]
        output['Tight_Muon'].fill(dataset=dataset, pt=ak.flatten(tight_muon.pt), eta=ak.flatten(tight_muon.eta))
        output['Muon'].fill(dataset=dataset, pt=ak.flatten(control_region_Muon.Muon.pt), eta=ak.flatten(control_region_Muon.Muon.eta))
        return output
    def postprocess(self, accumulator):
        return accumulator

### Getting Samples

The folder QCD containes a file `Samples.txt` with all desired samples, and a script `query.sh`. With the command `./query.sh` (with executing permission), one can generate text files containing all the root file names corresponding to the samples in `Samples.txt` using [dasgoclient](https://github.com/dmwm/dasgoclient).

Note that you need your grid passphrase for setting up the proxy.

In [4]:
Samples = glob.glob("QCD/QCD*.txt")

In [5]:
fileset = {}
for n in Samples:
    name = n.split("/")[1].strip(".txt")
    file = open(n,mode='r')
    content = file.read()
    file.close()
    lst = content.split("\n")
    if lst[-1]=='':
        lst = lst[:-1]
    lst = ["root://xcache/" + s for s in lst]  
    fileset[name]=lst

In [6]:
import time
from coffea_casa import CoffeaCasaCluster
from distributed import Client

# Set values at which you wish to run. Each execution takes one index from all the lists. They should thus be of equal size.
# multiplies: sets the 'blow-up' factor of the dataset. With 1, run all files. With 2, run all files twice, etc.
multiplies = [1]
# jobs: sets the number of jobs you're requesting
jobs = [50]
# chunksizes: sets the various chunk sizes which we run at.
chunksizes = [100000]

# Initialize dict to keep track of results of each run.
results = {'multiply': [], 'workers': [], 'chunksize': [], 'time': [], 'events/s/thread': [], 'events/s': [], 'Dask report': []}

# This function ensures that we aren't overwriting Dask report filenames.
def unique(filename):
    file, ext = os.path.splitext(filename)
    counter = 0
    while os.path.exists(filename):
        counter += 1
        filename = file + str(counter) + ext
    return filename

while (len(multiplies) == len(jobs)) and (len(jobs) == len(chunksizes)) and (len(multiplies) > 0):
    # Fileset blow-up is handled here.
    multiply = multiplies.pop(0)
    results['multiply'].append(multiply)
    files = fileset.copy()
    keys = fileset.copy().keys()
    while multiply > 1:
        for key in keys:
            files[key+str(multiply)] = files[key]
        multiply = multiply - 1
    
    # Cluster configuration is handled here.
    cluster = CoffeaCasaCluster()
    njobs = jobs.pop(0)
    results['workers'].append(njobs)
    cluster.scale(jobs=njobs)
    client = Client(cluster)
    chunksize = chunksizes.pop(0)
    results['chunksize'].append(chunksize)

    exe_args = {
            'client': client,
            'schema': processor.NanoAODSchema,
            'savemetrics': True
        }

    tic = time.time()
    from dask.distributed import performance_report
    fname = unique("dask-report-" + str(njobs) + "W" + str(int(len(files)/18)) + "F-" + str(chunksize) + "C.html")
    with performance_report(filename=fname):
        output = processor.run_uproot_job(files,
                                        treename = 'Events',
                                        processor_instance = Processor(),
                                        executor = processor.dask_executor,
                                        executor_args = exe_args,
                                        chunksize=chunksize
                                        )
    toc = time.time()

    results['time'].append(toc - tic)
    results['events/s/thread'].append(output[1]['entries'] / output[1]['processtime'])
    results['events/s'].append(output[1]['entries'] / (toc - tic))
    results['Dask report'].append(fname)

    cluster.close()
    # Sleep so cluster can shutdown. 60 seconds is safe; we can probably reduce this, but also, analysis run time is going to be longer... so is 60 seconds really that much of a problem?
    if len(jobs) > 1:
        time.sleep(60)

{'protocol': 'tls://', 'security': Security(require_encryption=True, tls_ca_file='/etc/cmsaf-secrets/ca.pem', tls_client_cert='/etc/cmsaf-secrets/hostcert.pem', tls_client_key='/etc/cmsaf-secrets/hostcert.pem', tls_scheduler_cert='/etc/cmsaf-secrets/hostcert.pem', tls_scheduler_key='/etc/cmsaf-secrets/hostcert.pem', tls_worker_cert='/etc/cmsaf-secrets/hostcert.pem', tls_worker_key='/etc/cmsaf-secrets/hostcert.pem'), 'log_directory': 'logs', 'silence_logs': 'DEBUG', 'scheduler_options': {'port': 8786, 'dashboard_address': '8787', 'protocol': 'tls', 'external_address': 'tls://matousadamec-40gmail-2ecom.dask.coffea.casa:8786'}, 'job_extra': {'universe': 'docker', 'docker_image': 'coffeateam/coffea-casa-analysis:2021.05.06post0', 'container_service_names': 'dask', 'dask_container_port': 8786, 'transfer_input_files': '/etc/cmsaf-secrets/ca.pem, /etc/cmsaf-secrets/hostcert.pem, /etc/cmsaf-secrets/xcache_token', 'encrypt_input_files': '/etc/cmsaf-secrets/ca.pem, /etc/cmsaf-secrets/hostcert.pe

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tls://192.168.147.176:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.core - INFO - Event loop was unresponsive in Scheduler for 9.80s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.scheduler - INFO - Register worker <Worker 'tls://matousadamec-40gmail-2ecom.dask-worker.coffea.casa:8788', name: kubernetes-worker-d64ca1ed-6591-482d-b416-84b380efbf5f, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tls://matousadamec-40gmail-2ecom.dask-worker.coffea.casa:8788
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-e5e692c6-b1d2-11eb-80f3-66450b8b0bb0
distributed.core - INFO - Starting established connection


[                                        ] | 0% Completed |  9.2s

distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed


[#                                       ] | 3% Completed | 49.5s

KeyboardInterrupt: 

In [None]:
for i in range(len(results['time'])):
    print('Run ' + str(i+1))
    for key in results.keys():
        if isinstance(results[key][i], int):
            print(str(key) + ': ' + str(round(results[key][i], 3)))
        else:
            print(str(key) + ': ' + str(results[key][i]))
    print('\n')

### Plotting
https://github.com/cmstas/FTAnalysis/blob/run2/analysis/fakes/derivation/ScanChain_fast.C#L161

In [None]:
import pickle
path = 'histos/QCD.pkl.gz'
print('Opening path: ', path)
hists = {}
with gzip.open(path) as fin:
    QCD= pickle.load(fin)

In [None]:
QCD

In [None]:
def combine_mat(Dict):
    DICT = {}
    mat = np.zeros((7,3))
    for key, value in Dict.items():
        mat = np.add(mat, value)
    return mat

In [None]:
Loose_Muon = Hist2D.from_bincounts(
    combine_mat(QCD[0]['Muon'].sum().values()).T,
    (QCD[0]['Muon'].axis('pt').edges(), QCD[0]['Muon'].axis('eta').edges())
)

In [None]:
Loose_Muon

In [None]:
Tight_Muon = Hist2D.from_bincounts(
    combine_mat(QCD[0]['Tight_Muon'].sum().values()).T,
    (QCD[0]['Tight_Muon'].axis('pt').edges(), QCD[0]['Tight_Muon'].axis('eta').edges())
)

In [None]:
Tight_Muon

In [None]:
Tight_Muon.divide(Loose_Muon)