diff --git a/atlas/.gitignore b/atlas/.gitignore index 5bc5f59..97e35e8 100644 --- a/atlas/.gitignore +++ b/atlas/.gitignore @@ -212,3 +212,7 @@ __marimo__/ ntuple_production/production_status.json # preprocess json preprocess_output.json +# Dask reports +*html +# figures +*png diff --git a/atlas/analysis.ipynb b/atlas/analysis.ipynb index af4440f..7525f23 100644 --- a/atlas/analysis.ipynb +++ b/atlas/analysis.ipynb @@ -7,11 +7,8 @@ "metadata": {}, "outputs": [], "source": [ - "# ! pip install --upgrade atlas_schema\n", - "# ! pip install --upgrade git+https://github.com/scikit-hep/mplhep.git\n", - "\n", - "# import importlib\n", - "# importlib.reload(utils)" + "! pip install --upgrade --quiet atlas_schema\n", + "! pip install --upgrade --quiet --pre mplhep" ] }, { @@ -27,18 +24,19 @@ "import time\n", "\n", "import awkward as ak\n", + "import cloudpickle\n", "import dask\n", - "import vector\n", "import hist\n", "import matplotlib.pyplot as plt\n", "import mplhep\n", "import numpy as np\n", "import uproot\n", + "import vector\n", "\n", "from atlas_schema.schema import NtupleSchema\n", "from coffea import processor\n", "from coffea.nanoevents import NanoEventsFactory\n", - "from dask.distributed import Client, PipInstall\n", + "from dask.distributed import Client, PipInstall, performance_report\n", "\n", "\n", "import utils\n", @@ -49,7 +47,9 @@ "client = Client(\"tls://localhost:8786\")\n", "\n", "plugin = PipInstall(packages=[\"atlas_schema\"], pip_options=[\"--upgrade\"])\n", - "client.register_plugin(plugin)" + "client.register_plugin(plugin)\n", + "\n", + "cloudpickle.register_pickle_by_value(utils)" ] }, { @@ -74,6 +74,7 @@ "\n", "# construct fileset\n", "fileset = {}\n", + "input_size_GB = 0\n", "for containers_for_category in dataset_info.values():\n", " for container, metadata in containers_for_category.items():\n", " if metadata[\"files_output\"] is None:\n", @@ -82,13 +83,16 @@ "\n", " dsid, _, campaign = utils.dsid_rtag_campaign(container)\n", "\n", - " # debugging shortcuts\n", - " # if campaign not in [\"mc20a\", \"data15\", \"data16\"]: continue\n", - " # if \"601352\" not in dsid: continue\n", + " # debugging shortcuts, use one or both of the following to reduce workload\n", + " if campaign not in [\"mc23a\", \"data22\"]: continue\n", + " # if \"601229\" not in dsid: continue\n", "\n", " weight_xs = utils.sample_xs(campaign, dsid)\n", " lumi = utils.integrated_luminosity(campaign)\n", " fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n", + " input_size_GB += metadata[\"size_output_GB\"]\n", + "\n", + "print(f\"fileset has {len(fileset)} categories with {sum([len(f[\"files\"]) for f in fileset.values()])} files total, size is {input_size_GB:.2f} GB\")\n", "\n", "# minimal fileset for debugging\n", "# fileset = {\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\": fileset[\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\"]}\n", @@ -148,16 +152,18 @@ "outputs": [], "source": [ "run = processor.Runner(\n", - " executor = processor.DaskExecutor(client=client),\n", - " # executor = processor.IterativeExecutor(),\n", + " executor = processor.DaskExecutor(client=client, treereduction=4),\n", + " # executor = processor.IterativeExecutor(), # to run locally\n", " schema=NtupleSchema,\n", " savemetrics=True,\n", - " chunksize=100_000,\n", + " chunksize=50_000,\n", " skipbadfiles=True,\n", - " # maxchunks=1\n", + " align_clusters=False,\n", + " # maxchunks=1 # for debugging only\n", ")\n", "\n", - "preprocess_output = run.preprocess(fileset)\n", + "with performance_report(filename=\"preprocess.html\"):\n", + " preprocess_output = run.preprocess(fileset)\n", "\n", "# write to disk\n", "with open(\"preprocess_output.json\", \"w\") as f:\n", @@ -187,7 +193,7 @@ "source": [ "class Analysis(processor.ProcessorABC):\n", " def __init__(self):\n", - " self.h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").\\\n", + " self.h = hist.new.Regular(20, 0, 1_000, label=\"$m_{jj}$ [GeV]\").\\\n", " StrCat([], name=\"dsid_and_campaign\", growth=True).\\\n", " StrCat([], name=\"variation\", growth=True).\\\n", " Weight()\n", @@ -213,12 +219,14 @@ " sumw = None # no normalization for data\n", "\n", " for variation in events.systematic_names:\n", - " if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n", + " if variation not in [\"NOSYS\"] + [name for name in events.systematic_names if \"JET_JER_Effective\" in name]:\n", " continue\n", "\n", " cut = events[variation][\"pass\"][\"ejets\"] == 1\n", + " # TODO: remaining weights\n", " weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"]\n", - " self.h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n", + " mjj = (events[variation][cut==1].jet[:, 0] + events[variation][cut==1].jet[:, 1]).mass\n", + " self.h.fill(mjj / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n", "\n", " return {\n", " \"hist\": self.h,\n", @@ -239,10 +247,20 @@ " accumulator[\"hist\"][:, dsid_and_campaign, :] = np.stack([count_normalized, variance_normalized], axis=-1)\n", "\n", "\n", - "t0 = time.perf_counter()\n", - "out, report = run(preprocess_output, processor_instance=Analysis())\n", + "client.run_on_scheduler(utils.start_tracking) # track worker count on scheduler\n", + "t0 = time.perf_counter() # track walltime\n", + "\n", + "with performance_report(filename=\"process.html\"):\n", + " out, report = run(preprocess_output, processor_instance=Analysis())\n", + "\n", "t1 = time.perf_counter()\n", - "report" + "worker_count_dict = client.run_on_scheduler(utils.stop_tracking) # stop tracking, read out data, get average\n", + "nworker_avg = utils.get_avg_num_workers(worker_count_dict)\n", + "\n", + "print(f\"histogram size: {out[\"hist\"].view(True).nbytes / 1_000 / 1_000:.2f} GB\\n\")\n", + "\n", + "# shortened version of report, dropping extra columns\n", + "dict((k, v) for k, v in report.items() if k != \"columns\") | ({\"columns\": report[\"columns\"][0:10] + [\"...\"]})" ] }, { @@ -250,7 +268,10 @@ "id": "8663e9ff-f8bb-43a0-8978-f2d430d2bbbd", "metadata": {}, "source": [ - "track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)" + "track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)\n", + "\n", + "**to-do for metrics:**\n", + "- data rate by tracking `bytesread` per chunk" ] }, { @@ -260,13 +281,25 @@ "metadata": {}, "outputs": [], "source": [ - "print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks\")\n", + "print(f\"walltime: {t1 - t0:.2f} sec ({(t1 - t0) / 60:.2f} min)\")\n", + "print(f\"average worker count: {nworker_avg:.1f}\")\n", + "print(f\"number of events processed: {report[\"entries\"]:,}\\n\")\n", + "\n", + "print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks (average {report[\"bytesread\"] / 1000**3 / report[\"chunks\"]:.2f} GB per chunk)\")\n", + "print(f\"average total data rate: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")\n", + "print(f\"fraction of input files read: {report[\"bytesread\"] / 1000**3 / input_size_GB:.1%}\")\n", + "print(f\"number of branches read: {len(report[\"columns\"])}\\n\")\n", + "\n", + "print(f\"worker-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n", + "print(f\"worker-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\\n\")\n", "\n", - "print(f\"core-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n", - "print(f\"core-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\")\n", + "print(f\"average event rate using walltime and time-averaged worker count: {report[\"entries\"] / 1000 / (t1 - t0) / nworker_avg:.2f} kHz\")\n", + "print(f\"average data rate using walltime and time-averaged worker count: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0) / nworker_avg:.2f} Gbps\\n\")\n", "\n", - "print(f\"average event rate using walltime: {report[\"entries\"] / 1000 / (t1 - t0):.2f} kHz\")\n", - "print(f\"average data rate using walltime: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")" + "print(f\"fraction of time spent in processing: {report[\"processtime\"] / ((t1 - t0) * nworker_avg):.1%}\")\n", + "print(f\"average process task length: {report[\"processtime\"] / report[\"chunks\"]:.1f} sec\")\n", + "\n", + "_ = utils.plot_worker_count(worker_count_dict)" ] }, { @@ -285,7 +318,7 @@ "\n", " dsids = sorted(set(dsids))\n", " dsids_in_hist = [dc for dc in out[\"hist\"].axes[1] if dc.split(\"_\")[0] in dsids]\n", - " print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n", + " # print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n", "\n", " if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"] or len(dsids_in_hist) == 0:\n", " continue # data drawn separately, skip MC modeling variations and skip empty categories\n", @@ -293,8 +326,14 @@ " mc_stack.append(out[\"hist\"][:, :, \"NOSYS\"].integrate(\"dsid_and_campaign\", dsids_in_hist))\n", " labels.append(key)\n", "\n", - "fig, ax1, ax2 = mplhep.data_model(\n", - " data_hist=out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"],\n", + "try:\n", + " data_hist = out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"]\n", + "except ValueError:\n", + " print(\"falling back to plotting first entry of categorical axes as \\\"data\\\"\")\n", + " data_hist = out[\"hist\"][:, 0, 0]\n", + "\n", + "fig, ax1, ax2 = mplhep.comp.data_model(\n", + " data_hist=data_hist,\n", " stacked_components=mc_stack,\n", " stacked_labels=labels,\n", " # https://scikit-hep.org/mplhep/gallery/model_with_stacked_and_unstacked_histograms_components/\n", @@ -309,7 +348,7 @@ "ax2.set_ylim([0.5, 1.5])\n", "\n", "# compare to e.g. https://atlas.web.cern.ch/Atlas/GROUPS/PHYSICS/PAPERS/HDBS-2020-11/fig_02a.png\n", - "fig.savefig(\"el_pt.png\")" + "fig.savefig(\"mjj.png\")" ] }, { @@ -326,9 +365,7 @@ " f.write(json.dumps(out[\"hist\"], default=uhi.io.json.default).encode(\"utf-8\"))\n", "\n", "with gzip.open(\"hist.json.gz\") as f:\n", - " h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))\n", - "\n", - "h[:, \"data_data15\", \"NOSYS\"]" + " h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))" ] } ], diff --git a/atlas/ntuple_production/ntuple_summary.ipynb b/atlas/ntuple_production/ntuple_summary.ipynb index 59ab0f5..7914bbc 100644 --- a/atlas/ntuple_production/ntuple_summary.ipynb +++ b/atlas/ntuple_production/ntuple_summary.ipynb @@ -18,7 +18,7 @@ "metadata": {}, "outputs": [], "source": [ - "fname = \"ntuple_metadata.json.gz\"\n", + "fname = \"file_metadata.json.gz\"\n", "\n", "with gzip.open(fname) as f:\n", " content = json.loads(f.read().decode())" @@ -35,12 +35,12 @@ "output_type": "stream", "text": [ "PHYSLITE INPUT\n", - " nfiles: 876,602\n", - " size [GB]: 565,834\n", - " nevents: 59,236,115,268\n", + " nfiles: 877,199\n", + " size [GB]: 566,120\n", + " nevents: 59,251,746,268\n", "NTUPLE OUTPUT\n", - " nfiles: 8,728\n", - " size [GB]: 19,629\n" + " nfiles: 9,287\n", + " size [GB]: 21,254\n" ] } ], @@ -54,6 +54,8 @@ "\n", "sizes_output_GB = []\n", "avg_sizes_output_GB = []\n", + "nfiles_output = []\n", + "\n", "\n", "for category_info in content.values():\n", " for dataset_info in category_info.values():\n", @@ -66,6 +68,7 @@ " sum_size_output_GB += dataset_info[\"size_output_GB\"]\n", " sizes_output_GB.append(dataset_info[\"size_output_GB\"])\n", " avg_sizes_output_GB.append(dataset_info[\"size_output_GB\"] / dataset_info[\"nfiles_output\"])\n", + " nfiles_output.append(dataset_info[\"nfiles_output\"])\n", "\n", "print(\"PHYSLITE INPUT\")\n", "print(f\" nfiles: {sum_nfiles_input:,}\")\n", @@ -85,9 +88,9 @@ "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ - "
" + "
" ] }, "metadata": {}, @@ -98,13 +101,16 @@ "import hist\n", "import matplotlib.pyplot as plt\n", "\n", - "fig, (ax1, ax2, ax3) = plt.subplots(figsize=(12, 4), ncols=3, constrained_layout=True)\n", + "fig, (ax1, ax2, ax3, ax4) = plt.subplots(figsize=(16, 4), ncols=4, constrained_layout=True)\n", + "\n", + "hist.new.Regular(20, 0, 1_000, label=\"total size of container [GB]\").Double().fill(sizes_output_GB).plot(ax=ax1, yerr=False)\n", + "h = hist.new.Regular(30, 0, 5, label=\"average file size per container [GB]\").Double().fill(avg_sizes_output_GB).plot(ax=ax2, yerr=False)\n", "\n", - "hist.new.Regular(20, 0, 1000, label=\"total size of container [GB]\").Double().fill(sizes_output_GB).plot(ax=ax1, yerr=False)\n", - "h = hist.new.Regular(30, 0, 6, label=\"average file size per container [GB]\").Double().fill(avg_sizes_output_GB).plot(ax=ax2, yerr=False)\n", + "h = hist.new.Regular(30, 0, 5, label=\"average file size per container [GB]\").Regular(20, 0, 1_000, label=\"total size of container [GB]\").Double()\n", + "_ = h.fill(avg_sizes_output_GB, sizes_output_GB).plot(ax=ax3, cmin=1)\n", "\n", - "h = hist.new.Regular(30, 0, 6, label=\"average file size per container [GB]\").Regular(20, 0, 1000, label=\"total size of container [GB]\").Double()\n", - "_ = h.fill(avg_sizes_output_GB, sizes_output_GB).plot(ax=ax3, cmin=1)" + "h = hist.new.Regular(30, 0, 5, label=\"average file size per container [GB]\").Regular(50, 0, 500, label=\"number of files in container\").Double()\n", + "_ = h.fill(avg_sizes_output_GB, nfiles_output).plot(ax=ax4, cmin=1)" ] } ], @@ -124,7 +130,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.5" + "version": "3.12.11" } }, "nbformat": 4, diff --git a/atlas/utils.py b/atlas/utils.py index 4a5a6de..94b8afd 100644 --- a/atlas/utils.py +++ b/atlas/utils.py @@ -1,10 +1,56 @@ # saving preprocessing output +import asyncio import base64 import dataclasses +import datetime import re import urllib.request import coffea +import matplotlib.pyplot as plt + + +################################################## +### Dask task tracking +################################################## + +def start_tracking(dask_scheduler) -> None: + """"run on scheduler to track worker count""" + dask_scheduler.worker_counts = {} + dask_scheduler.track_count = True + + async def track_count() -> None: + while dask_scheduler.track_count: + dask_scheduler.worker_counts[datetime.datetime.now()] = len(dask_scheduler.workers) + await asyncio.sleep(1) + + asyncio.create_task(track_count()) + + +def stop_tracking(dask_scheduler) -> dict: + """obtain worker count and stop tracking""" + dask_scheduler.track_count = False + return dask_scheduler.worker_counts + + +def get_avg_num_workers(worker_count_dict: dict) -> float: + """get time-averaged worker count""" + worker_info = list(worker_count_dict.items()) + nworker_dt = 0 + for (t0, nw0), (t1, nw1) in zip(worker_info[:-1], worker_info[1:]): + nworker_dt += (nw1 + nw0) / 2 * (t1 - t0).total_seconds() + return nworker_dt / (worker_info[-1][0] - worker_info[0][0]).total_seconds() + + +def plot_worker_count(worker_count_dict: dict): + """plot worker count over time""" + fig, ax = plt.subplots() + ax.plot(worker_count_dict.keys(), worker_count_dict.values()) + ax.tick_params(axis="x", labelrotation=45) + ax.set_ylim([0, ax.get_ylim()[1]]) + ax.set_xlabel("time") + ax.set_ylabel("number of workers") + return fig, ax ##################################################