# Multi-user exercise at 2024 IRIS-HEP retreat

In [None]:
from pathlib import Path
import datetime
import traceback

import awkward as ak
import dask
import dask_awkward as dak
import hist.dask
import coffea
import numpy as np
import uproot
from dask.distributed import Client, performance_report
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.style.use("ggplot")

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea.analysis_tools import PackedSelection
from coffea import dataset_tools

from functools import partial
import os
import time
import warnings
import pathlib

import utils  # worker count tracking

executor = "dask"   # "dask" or "taskvine" or "dask_gateway"

warnings.filterwarnings("ignore")
NanoAODSchema.warn_missing_crossrefs = False # silences warnings about branches we will not use here

    
print(f"awkward: {ak.__version__}")
print(f"dask-awkward: {dak.__version__}")
print(f"uproot: {uproot.__version__}")
print(f"hist: {hist.__version__}")
print(f"coffea: {coffea.__version__}")


# create a folder for output tracking of uproot.open setup
MEASUREMENT_PATH = pathlib.Path(datetime.datetime.now().strftime("measurements/%Y-%m-%d_%H-%M-%S"))
os.makedirs(MEASUREMENT_PATH)

In [None]:
scheduler_options = {}

if executor == "dask_gateway":
    num_workers = 100   #number of workers desired
    from dask.distributed import LocalCluster, Client, progress
    from dask_gateway import Gateway
    import pathlib
    
    gateway = Gateway()
    clusters=gateway.list_clusters()
    cluster = gateway.connect(clusters[0].name)
    client = cluster.get_client()
    cluster.scale(num_workers)
    # %%
    def set_env(dask_worker):
        path = str(pathlib.Path(dask_worker.local_directory) / 'access_token')
        os.environ["BEARER_TOKEN_FILE"] = path
        os.chmod(path, 0o600)
        os.chmod("/etc/grid-security/certificates", 0o755)

    client.wait_for_workers(num_workers)
    client.upload_file("/etc/cmsaf-secrets/access_token")
    client.run(set_env)
        
else:
    # by default use dask   
    # local: single thread, single worker
    from dask.distributed import LocalCluster, Client, progress
    
    # cluster = LocalCluster(n_workers=1, processes=False, threads_per_worker=1)
    # client = Client(cluster)
    client = Client("tls://localhost:8786")

In [None]:
import json
fname = "zstd_files.json"
fileset = {}
with open(fname,'r') as fp:
    for i,(dataset_name,file_list) in enumerate(json.load(fp).items()):
        fileset[dataset_name] = {"files": {}}
        for j,dataset_fpath in enumerate(file_list):
            xrd_fpath = f"root://xcache.cmsaf-dev.flatiron.hollandhpc.org:1094/{dataset_fpath}"
            fileset[dataset_name]["files"][xrd_fpath] = "Events"

In [None]:
# apply optional filtering to limit number of input files

# limite to the first N files per container, None if no limit
LIMIT_NUM_FILES = 100

# limit to the first N containers, None if no limit
LIMIT_NUM_CONTAINERS = 30

fileset = coffea.dataset_tools.max_files(fileset, LIMIT_NUM_FILES)

if LIMIT_NUM_CONTAINERS is not None:
    fileset = dict((k,v) for i, (k,v) in enumerate(fileset.items()) if i <LIMIT_NUM_CONTAINERS)

print(f"number of input files after filter: {sum([len(f['files']) for f in fileset.values()])}")
utils.worker_tracking.save_fileset(fileset, MEASUREMENT_PATH)

In [None]:
# turn fileset into simple list of files to run over
all_files = []
for process in fileset:
    all_files += fileset[process]["files"]

# define work to be done
def uproot_open_materialize(fname):
    BRANCH_LIST = [
        "GenPart_pt", "GenPart_eta", "GenPart_phi", "CorrT1METJet_phi",
        "GenJet_pt", "CorrT1METJet_eta", "SoftActivityJet_pt",
        "Jet_eta", "Jet_phi", "SoftActivityJet_eta", "SoftActivityJet_phi", 
        "CorrT1METJet_rawPt", "Jet_btagDeepFlavB", "GenJet_eta", 
        "GenPart_mass", "GenJet_phi",
        "Jet_puIdDisc", "CorrT1METJet_muonSubtrFactor", "Jet_btagDeepFlavCvL",
        "Jet_btagDeepFlavQG", "Jet_mass", "Jet_pt", "GenPart_pdgId",
        "Jet_btagDeepFlavCvB", "Jet_cRegCorr"
        ]

    filter_name = lambda x: x in BRANCH_LIST

    size_uncompressed = 0
    t0 = time.perf_counter()
    try:
        with uproot.open(fname, filter_name=filter_name) as f:
            num_entries = f["Events"].num_entries
            for b in BRANCH_LIST:
                size_uncompressed += f["Events"][b].uncompressed_bytes

            for _ in f["Events"].iterate(expressions=BRANCH_LIST):
                pass

            size_read = f.file.source.num_requested_bytes
        exception = None
    except:
        num_entries = 0
        size_read = 0
        size_uncompressed = 0
        exception = traceback.format_exc()

    t1 = time.perf_counter()
    time_finished = datetime.datetime.now()
    return {"fname": fname, "read": size_read, "uncompressed": size_uncompressed, "num_entries": num_entries,
            "runtime": t1-t0, "time_finished": time_finished, "exception": exception}

In [None]:
# perform computation
print(f"running with {len(all_files)} files")

utils.worker_tracking.start_tracking_workers(client, MEASUREMENT_PATH)  # track worker count in background
with performance_report(filename=MEASUREMENT_PATH/"dask-report-plain-uproot.html"):
    tasks = [dask.delayed(uproot_open_materialize)(f) for f in all_files]
    t0 = time.perf_counter()
    out = ak.Array(dask.compute(*tasks))
    t1 = time.perf_counter()

utils.worker_tracking.stop_tracking_workers()

print(f"wall clock time: {t1-t0:.2f}s")
utils.worker_tracking.save_measurement(out, t0, t1, MEASUREMENT_PATH)

In [None]:
# load measurements from file again
timestamps, nworkers, avg_num_workers = utils.worker_tracking.get_timestamps_and_counts(MEASUREMENT_PATH)  # worker count info
out, t0, t1 = utils.worker_tracking.load_measurement(MEASUREMENT_PATH)

# summary of performance
read_GB = sum(out['read']) / 1000**3
print(f"total read (compressed): {read_GB:.2f} GB")
print(f"total read (uncompressed): {sum(out['uncompressed']) / 1000**3:.2f} GB")

rate_Gbps = read_GB*8/(t1-t0)
print(f"average data rate: {rate_Gbps:.2f} Gbps (need to scale by x{200/rate_Gbps:.1f} to reach 200 Gbps)")

n_evts = sum(out["num_entries"])
print(f"total event rate (wall clock time): {n_evts / (t1-t0) / 1000:.2f} kHz (processed {n_evts} events total)")

total_runtime = sum(out["runtime"])
print(f"total aggregated runtime in function: {total_runtime:.2f} s")
print(f"ratio total runtime / wall clock time: {total_runtime / (t1-t0):.2f} "\
      "(should match # cores without overhead / scheduling issues)")
print(f"time-averaged number of workers: {avg_num_workers:.1f}")
print(f"\"efficiency\" (ratio of two numbers above): {total_runtime / (t1-t0) / avg_num_workers:.1%}")
print(f"event rate (aggregated time spent in function): {n_evts / total_runtime / 1000:.2f} kHz")

In [None]:
# get arrays for starting time, runtime and end time of all tasks
runtimes = np.asarray([datetime.timedelta(seconds=t) for t in out["runtime"]], dtype=np.timedelta64)
ends = out["time_finished"].to_numpy()
starts = ends - runtimes

# calculate instantaneous rates for given timestamp
times_for_rates = []
instantaneous_rates = []
for t in timestamps[::10]:  # only calculate every 30 seconds
    mask = np.logical_and((starts <= t), (t <= ends))  # mask for tasks running at given timestamp
    rate_Gbps_at_timestamp = sum(out[mask]['read']*8 / 1000**3 / out[mask]["runtime"])
    times_for_rates.append(t)
    instantaneous_rates.append(rate_Gbps_at_timestamp)

utils.worker_tracking.plot_worker_count(timestamps, nworkers, avg_num_workers, times_for_rates, instantaneous_rates, MEASUREMENT_PATH)

In [None]:
print(f"{sum(o is not None for o in out['exception'])} files failed\n")

# use below to get full list with details
# for report in out:
#     if report["exception"] is not None:
#         print(f"{report['fname']} failed in {report['runtime']:.2f} s\n{report['exception']}\n")

In [None]:
# runtime distribution for all files
fig, ax = plt.subplots() 
bins = np.linspace(0, max(out["runtime"])*1.01, 100)
ax.hist(out["runtime"], bins=bins)
ax.set_xlabel("runtime [s]")
ax.set_xlim([0, ax.get_xlim()[1]])
ax.set_ylabel("count")
ax.semilogy()
fig.savefig(MEASUREMENT_PATH / "runtime_distribution.pdf")

In [None]:
# runtime vs number of events in file
fig, ax = plt.subplots()
ax.scatter(out["num_entries"], out["runtime"], marker="x")
ax.set_xlabel("number of events")
ax.set_ylabel("runtime [s]")

xlim = ax.get_xlim()
ylim = ax.get_ylim()
xvals = np.linspace(*xlim, 100)
ax.plot(xvals, xvals/(25*1_000), label="25 kHz", linestyle="-", c="C1")
ax.plot(xvals, xvals/(50*1_000), label="50 kHz", linestyle="--", c="C2")
ax.plot(xvals, xvals/(100*1_000), label="100 kHz", linestyle=":", c="C3")
ax.set_xlim([0, xlim[1]])
ax.set_ylim([0, ylim[1]])
ax.legend()

fig.savefig(MEASUREMENT_PATH / "runtime_vs_nevts.pdf")