In [1]:
from dask.distributed import Client, config
client = Client()
client


+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| blosc   | None   | 1.9.2     | None    |
| lz4     | 3.1.3  | 3.1.1     | 3.1.3   |
| msgpack | 1.0.2  | 1.0.0     | 1.0.2   |
+---------+--------+-----------+---------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


0,1
Client  Scheduler: tcp://my-dask-scheduler:8786  Dashboard: http://my-dask-scheduler:8787/status,Cluster  Workers: 3  Cores: 3  Memory: 10.50 GB


For authentication, create a proxy certificate on your local machine (using `voms-proxy-init -voms atlas`) and upload the proxy certificate file (checkout `echo $X509_USER_PROXY` on your local machine) to the same folder as this jupyter notebook (you can use jupyter lab upload files on the left). Then rename it to `x509up` and set your user name in the following notebook cell:

In [6]:
import os
import rucio.client
x509_proxy_path = os.path.join(os.getcwd(), "x509up")
os.chmod(x509_proxy_path, 0o400)
os.environ['X509_USER_PROXY'] = x509_proxy_path
os.environ['RUCIO_ACCOUNT']  = 'nihartma'
rucio_client = rucio.client.Client()

In [7]:
def get_cern_names():
    g = rucio_client.list_replicas([{'scope': 'data17_13TeV', 'name': 'data17_13TeV.periodK.physics_Main.PhysCont.DAOD_PHYSLITE.grp17_v01_p4309'}])
    cern_fnames = []
    for n,a in enumerate(g):
        cern_fname = [k for k,v in a['pfns'].items() if v['rse'] == 'CERN-PROD_LOCALGROUPDISK']
        cern_fnames.append(cern_fname[0])
        if n > 10:
            break
    return cern_fnames
fnames = get_cern_names()
fnames[:3]

['root://eosatlas.cern.ch:1094//eos/atlas/atlaslocalgroupdisk/dq2/rucio/data17_13TeV/26/59/DAOD_PHYSLITE.22958105._000001.pool.root.1',
 'root://eosatlas.cern.ch:1094//eos/atlas/atlaslocalgroupdisk/dq2/rucio/data17_13TeV/48/73/DAOD_PHYSLITE.22958105._000002.pool.root.1',
 'root://eosatlas.cern.ch:1094//eos/atlas/atlaslocalgroupdisk/dq2/rucio/data17_13TeV/63/bb/DAOD_PHYSLITE.22958105._000003.pool.root.1']

In [8]:
!voms-proxy-info --all

subject   : /C=DE/O=GermanGrid/OU=LMU/CN=Nikolai Hartmann/CN=1020189479
issuer    : /C=DE/O=GermanGrid/OU=LMU/CN=Nikolai Hartmann
identity  : /C=DE/O=GermanGrid/OU=LMU/CN=Nikolai Hartmann
type      : RFC3820 compliant impersonation proxy
strength  : 2048
path      : /home/jovyan/x509up
timeleft  : 70:56:34
key usage : Digital Signature, Key Encipherment, Data Encipherment
=== VO atlas extension information ===
VO        : atlas
subject   : /C=DE/O=GermanGrid/OU=LMU/CN=Nikolai Hartmann
issuer    : /DC=ch/DC=cern/OU=computers/CN=lcg-voms2.cern.ch
attribute : /atlas/Role=NULL/Capability=NULL
attribute : /atlas/de/Role=NULL/Capability=NULL
attribute : /atlas/lcg1/Role=NULL/Capability=NULL
attribute : nickname = nihartma (atlas)
timeleft  : 70:56:34
uri       : lcg-voms2.cern.ch:15001



In [9]:
pfilename = '/tmp/x509up_u1000'
proxydata = open(x509_proxy_path).read()

import uproot
import json
import awkward as ak
from physlite_experiments.physlite_events import (
    physlite_events, get_lazy_form, get_branch_forms, Factory, LazyGet
)
from physlite_experiments.analysis_example import get_obj_sel 
from physlite_experiments.utils import subdivide 


def run(filename,a, max_chunksize = 10000):
    import os
    import random
    import time
    import uproot
    time.sleep(random.randint(2,5))
    with open(pfilename,'w') as pfile:
        pfile.write(proxydata)
    os.environ['X509_USER_PROXY'] = pfilename
    output = {
        collection: {
            flag : 0
            for flag in ["baseline", "passOR", "signal"]
        } for collection in ["Electrons", "Muons", "Jets"]
    }
    nevents = 0
    with uproot.open(
        f"{filename}:CollectionTree",
        array_cache=None,
        #xrootd_handler=uproot.MultithreadedXRootDSource
        xrootd_handler=uproot.XRootDSource
    ) as tree:
        if tree.num_entries > max_chunksize:
            n_chunks = tree.num_entries // max_chunksize
        else:
            n_chunks = 1
        # now global
        form = json.dumps(get_lazy_form(get_branch_forms(tree)))
        entry_start = 0
        for num_entries in subdivide(tree.num_entries, n_chunks):
            entry_stop = entry_start + num_entries
            container = LazyGet(
                tree, entry_start=entry_start, entry_stop=entry_stop
            )
            factory = Factory(form, entry_stop - entry_start, container)
            events = factory.events
            events_decorated = get_obj_sel(events)
            entry_start = entry_stop
            for collection in output:
                for flag in output[collection]:
                    output[collection][flag] += ak.count_nonzero(
                        events_decorated[collection][flag]
                    )
            nevents += len(events)
    return output, nevents 

In [10]:
import numpy as np
futures = client.map(run,fnames,np.random.random(len(fnames)))
client.gather(futures)

[({'Electrons': {'baseline': 4871, 'passOR': 4826, 'signal': 3626},
   'Muons': {'baseline': 8941, 'passOR': 6761, 'signal': 4976},
   'Jets': {'baseline': 258610, 'passOR': 251696, 'signal': 224474}},
  44235),
 ({'Electrons': {'baseline': 4776, 'passOR': 4736, 'signal': 3618},
   'Muons': {'baseline': 8716, 'passOR': 6495, 'signal': 4729},
   'Jets': {'baseline': 249948, 'passOR': 243251, 'signal': 217148}},
  43735),
 ({'Electrons': {'baseline': 4354, 'passOR': 4328, 'signal': 3309},
   'Muons': {'baseline': 8030, 'passOR': 6018, 'signal': 4440},
   'Jets': {'baseline': 217640, 'passOR': 211553, 'signal': 188923}},
  38078),
 ({'Electrons': {'baseline': 4106, 'passOR': 4071, 'signal': 3080},
   'Muons': {'baseline': 7805, 'passOR': 5854, 'signal': 4233},
   'Jets': {'baseline': 213795, 'passOR': 207957, 'signal': 185955}},
  37110),
 ({'Electrons': {'baseline': 4499, 'passOR': 4461, 'signal': 3340},
   'Muons': {'baseline': 8629, 'passOR': 6462, 'signal': 4695},
   'Jets': {'baselin