Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions atlas/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,7 @@ __marimo__/
ntuple_production/production_status.json
# preprocess json
preprocess_output.json
# Dask reports
*html
# figures
*png
107 changes: 72 additions & 35 deletions atlas/analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
"metadata": {},
"outputs": [],
"source": [
"# ! pip install --upgrade atlas_schema\n",
"# ! pip install --upgrade git+https://github.com/scikit-hep/mplhep.git\n",
"\n",
"# import importlib\n",
"# importlib.reload(utils)"
"! pip install --upgrade --quiet atlas_schema\n",
"! pip install --upgrade --quiet --pre mplhep"
]
},
{
Expand All @@ -27,18 +24,19 @@
"import time\n",
"\n",
"import awkward as ak\n",
"import cloudpickle\n",
"import dask\n",
"import vector\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import mplhep\n",
"import numpy as np\n",
"import uproot\n",
"import vector\n",
"\n",
"from atlas_schema.schema import NtupleSchema\n",
"from coffea import processor\n",
"from coffea.nanoevents import NanoEventsFactory\n",
"from dask.distributed import Client, PipInstall\n",
"from dask.distributed import Client, PipInstall, performance_report\n",
"\n",
"\n",
"import utils\n",
Expand All @@ -49,7 +47,9 @@
"client = Client(\"tls://localhost:8786\")\n",
"\n",
"plugin = PipInstall(packages=[\"atlas_schema\"], pip_options=[\"--upgrade\"])\n",
"client.register_plugin(plugin)"
"client.register_plugin(plugin)\n",
"\n",
"cloudpickle.register_pickle_by_value(utils)"
]
},
{
Expand All @@ -74,6 +74,7 @@
"\n",
"# construct fileset\n",
"fileset = {}\n",
"input_size_GB = 0\n",
"for containers_for_category in dataset_info.values():\n",
" for container, metadata in containers_for_category.items():\n",
" if metadata[\"files_output\"] is None:\n",
Expand All @@ -82,13 +83,16 @@
"\n",
" dsid, _, campaign = utils.dsid_rtag_campaign(container)\n",
"\n",
" # debugging shortcuts\n",
" # if campaign not in [\"mc20a\", \"data15\", \"data16\"]: continue\n",
" # if \"601352\" not in dsid: continue\n",
" # debugging shortcuts, use one or both of the following to reduce workload\n",
" if campaign not in [\"mc23a\", \"data22\"]: continue\n",
" # if \"601229\" not in dsid: continue\n",
"\n",
" weight_xs = utils.sample_xs(campaign, dsid)\n",
" lumi = utils.integrated_luminosity(campaign)\n",
" fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n",
" input_size_GB += metadata[\"size_output_GB\"]\n",
"\n",
"print(f\"fileset has {len(fileset)} categories with {sum([len(f[\"files\"]) for f in fileset.values()])} files total, size is {input_size_GB:.2f} GB\")\n",
"\n",
"# minimal fileset for debugging\n",
"# fileset = {\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\": fileset[\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\"]}\n",
Expand Down Expand Up @@ -148,16 +152,18 @@
"outputs": [],
"source": [
"run = processor.Runner(\n",
" executor = processor.DaskExecutor(client=client),\n",
" # executor = processor.IterativeExecutor(),\n",
" executor = processor.DaskExecutor(client=client, treereduction=4),\n",
" # executor = processor.IterativeExecutor(), # to run locally\n",
" schema=NtupleSchema,\n",
" savemetrics=True,\n",
" chunksize=100_000,\n",
" chunksize=50_000,\n",
" skipbadfiles=True,\n",
" # maxchunks=1\n",
" align_clusters=False,\n",
" # maxchunks=1 # for debugging only\n",
")\n",
"\n",
"preprocess_output = run.preprocess(fileset)\n",
"with performance_report(filename=\"preprocess.html\"):\n",
" preprocess_output = run.preprocess(fileset)\n",
"\n",
"# write to disk\n",
"with open(\"preprocess_output.json\", \"w\") as f:\n",
Expand Down Expand Up @@ -187,7 +193,7 @@
"source": [
"class Analysis(processor.ProcessorABC):\n",
" def __init__(self):\n",
" self.h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").\\\n",
" self.h = hist.new.Regular(20, 0, 1_000, label=\"$m_{jj}$ [GeV]\").\\\n",
" StrCat([], name=\"dsid_and_campaign\", growth=True).\\\n",
" StrCat([], name=\"variation\", growth=True).\\\n",
" Weight()\n",
Expand All @@ -213,12 +219,14 @@
" sumw = None # no normalization for data\n",
"\n",
" for variation in events.systematic_names:\n",
" if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n",
" if variation not in [\"NOSYS\"] + [name for name in events.systematic_names if \"JET_JER_Effective\" in name]:\n",
" continue\n",
"\n",
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
" # TODO: remaining weights\n",
" weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"]\n",
" self.h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n",
" mjj = (events[variation][cut==1].jet[:, 0] + events[variation][cut==1].jet[:, 1]).mass\n",
" self.h.fill(mjj / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n",
"\n",
" return {\n",
" \"hist\": self.h,\n",
Expand All @@ -239,18 +247,31 @@
" accumulator[\"hist\"][:, dsid_and_campaign, :] = np.stack([count_normalized, variance_normalized], axis=-1)\n",
"\n",
"\n",
"t0 = time.perf_counter()\n",
"out, report = run(preprocess_output, processor_instance=Analysis())\n",
"client.run_on_scheduler(utils.start_tracking) # track worker count on scheduler\n",
"t0 = time.perf_counter() # track walltime\n",
"\n",
"with performance_report(filename=\"process.html\"):\n",
" out, report = run(preprocess_output, processor_instance=Analysis())\n",
"\n",
"t1 = time.perf_counter()\n",
"report"
"worker_count_dict = client.run_on_scheduler(utils.stop_tracking) # stop tracking, read out data, get average\n",
"nworker_avg = utils.get_avg_num_workers(worker_count_dict)\n",
"\n",
"print(f\"histogram size: {out[\"hist\"].view(True).nbytes / 1_000 / 1_000:.2f} GB\\n\")\n",
"\n",
"# shortened version of report, dropping extra columns\n",
"dict((k, v) for k, v in report.items() if k != \"columns\") | ({\"columns\": report[\"columns\"][0:10] + [\"...\"]})"
]
},
{
"cell_type": "markdown",
"id": "8663e9ff-f8bb-43a0-8978-f2d430d2bbbd",
"metadata": {},
"source": [
"track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)"
"track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)\n",
"\n",
"**to-do for metrics:**\n",
"- data rate by tracking `bytesread` per chunk"
]
},
{
Expand All @@ -260,13 +281,25 @@
"metadata": {},
"outputs": [],
"source": [
"print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks\")\n",
"print(f\"walltime: {t1 - t0:.2f} sec ({(t1 - t0) / 60:.2f} min)\")\n",
"print(f\"average worker count: {nworker_avg:.1f}\")\n",
"print(f\"number of events processed: {report[\"entries\"]:,}\\n\")\n",
"\n",
"print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks (average {report[\"bytesread\"] / 1000**3 / report[\"chunks\"]:.2f} GB per chunk)\")\n",
"print(f\"average total data rate: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")\n",
"print(f\"fraction of input files read: {report[\"bytesread\"] / 1000**3 / input_size_GB:.1%}\")\n",
"print(f\"number of branches read: {len(report[\"columns\"])}\\n\")\n",
"\n",
"print(f\"worker-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n",
"print(f\"worker-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\\n\")\n",
"\n",
"print(f\"core-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n",
"print(f\"core-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\")\n",
"print(f\"average event rate using walltime and time-averaged worker count: {report[\"entries\"] / 1000 / (t1 - t0) / nworker_avg:.2f} kHz\")\n",
"print(f\"average data rate using walltime and time-averaged worker count: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0) / nworker_avg:.2f} Gbps\\n\")\n",
"\n",
"print(f\"average event rate using walltime: {report[\"entries\"] / 1000 / (t1 - t0):.2f} kHz\")\n",
"print(f\"average data rate using walltime: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")"
"print(f\"fraction of time spent in processing: {report[\"processtime\"] / ((t1 - t0) * nworker_avg):.1%}\")\n",
"print(f\"average process task length: {report[\"processtime\"] / report[\"chunks\"]:.1f} sec\")\n",
"\n",
"_ = utils.plot_worker_count(worker_count_dict)"
]
},
{
Expand All @@ -285,16 +318,22 @@
"\n",
" dsids = sorted(set(dsids))\n",
" dsids_in_hist = [dc for dc in out[\"hist\"].axes[1] if dc.split(\"_\")[0] in dsids]\n",
" print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n",
" # print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n",
"\n",
" if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"] or len(dsids_in_hist) == 0:\n",
" continue # data drawn separately, skip MC modeling variations and skip empty categories\n",
"\n",
" mc_stack.append(out[\"hist\"][:, :, \"NOSYS\"].integrate(\"dsid_and_campaign\", dsids_in_hist))\n",
" labels.append(key)\n",
"\n",
"fig, ax1, ax2 = mplhep.data_model(\n",
" data_hist=out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"],\n",
"try:\n",
" data_hist = out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"]\n",
"except ValueError:\n",
" print(\"falling back to plotting first entry of categorical axes as \\\"data\\\"\")\n",
" data_hist = out[\"hist\"][:, 0, 0]\n",
"\n",
"fig, ax1, ax2 = mplhep.comp.data_model(\n",
" data_hist=data_hist,\n",
" stacked_components=mc_stack,\n",
" stacked_labels=labels,\n",
" # https://scikit-hep.org/mplhep/gallery/model_with_stacked_and_unstacked_histograms_components/\n",
Expand All @@ -309,7 +348,7 @@
"ax2.set_ylim([0.5, 1.5])\n",
"\n",
"# compare to e.g. https://atlas.web.cern.ch/Atlas/GROUPS/PHYSICS/PAPERS/HDBS-2020-11/fig_02a.png\n",
"fig.savefig(\"el_pt.png\")"
"fig.savefig(\"mjj.png\")"
]
},
{
Expand All @@ -326,9 +365,7 @@
" f.write(json.dumps(out[\"hist\"], default=uhi.io.json.default).encode(\"utf-8\"))\n",
"\n",
"with gzip.open(\"hist.json.gz\") as f:\n",
" h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))\n",
"\n",
"h[:, \"data_data15\", \"NOSYS\"]"
" h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))"
]
}
],
Expand Down
34 changes: 20 additions & 14 deletions atlas/ntuple_production/ntuple_summary.ipynb

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions atlas/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
# saving preprocessing output
import asyncio
import base64
import dataclasses
import datetime
import re
import urllib.request

import coffea
import matplotlib.pyplot as plt


##################################################
### Dask task tracking
##################################################

def start_tracking(dask_scheduler) -> None:
""""run on scheduler to track worker count"""
dask_scheduler.worker_counts = {}
dask_scheduler.track_count = True

async def track_count() -> None:
while dask_scheduler.track_count:
dask_scheduler.worker_counts[datetime.datetime.now()] = len(dask_scheduler.workers)
await asyncio.sleep(1)

asyncio.create_task(track_count())


def stop_tracking(dask_scheduler) -> dict:
"""obtain worker count and stop tracking"""
dask_scheduler.track_count = False
return dask_scheduler.worker_counts


def get_avg_num_workers(worker_count_dict: dict) -> float:
"""get time-averaged worker count"""
worker_info = list(worker_count_dict.items())
nworker_dt = 0
for (t0, nw0), (t1, nw1) in zip(worker_info[:-1], worker_info[1:]):
nworker_dt += (nw1 + nw0) / 2 * (t1 - t0).total_seconds()
return nworker_dt / (worker_info[-1][0] - worker_info[0][0]).total_seconds()


def plot_worker_count(worker_count_dict: dict):
"""plot worker count over time"""
fig, ax = plt.subplots()
ax.plot(worker_count_dict.keys(), worker_count_dict.values())
ax.tick_params(axis="x", labelrotation=45)
ax.set_ylim([0, ax.get_ylim()[1]])
ax.set_xlabel("time")
ax.set_ylabel("number of workers")
return fig, ax


##################################################
Expand Down