Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,3 @@ __marimo__/
# NEVER CHANGE
# CMS Internal files are not supposed to be published
DONT_EXPOSE_CMS_INTERNAL/

# ATLAS setup
atlas/ntuple_production/production_status.json
4 changes: 4 additions & 0 deletions atlas/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,7 @@ __marimo__/
.asetup.save
*.out
*.err
# bigpanda status
ntuple_production/production_status.json
# preprocess json
preprocess_output.json
356 changes: 356 additions & 0 deletions atlas/analysis.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "3f9f15ac-903b-4f33-b775-e7c14a3647a8",
"metadata": {},
"outputs": [],
"source": [
"# ! pip install --upgrade atlas_schema\n",
"# ! pip install --upgrade git+https://github.com/scikit-hep/mplhep.git\n",
"\n",
"# import importlib\n",
"# importlib.reload(utils)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "94b8b953-dc21-4d5c-899d-b1f0b03c70b2",
"metadata": {},
"outputs": [],
"source": [
"import gzip\n",
"import json\n",
"import re\n",
"import time\n",
"\n",
"import awkward as ak\n",
"import dask\n",
"import vector\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import mplhep\n",
"import numpy as np\n",
"import uproot\n",
"\n",
"from atlas_schema.schema import NtupleSchema\n",
"from coffea import processor\n",
"from coffea.nanoevents import NanoEventsFactory\n",
"from dask.distributed import Client, PipInstall\n",
"\n",
"\n",
"import utils\n",
"\n",
"vector.register_awkward()\n",
"mplhep.style.use(mplhep.style.ATLAS1)\n",
"\n",
"client = Client(\"tls://localhost:8786\")\n",
"\n",
"plugin = PipInstall(packages=[\"atlas_schema\"], pip_options=[\"--upgrade\"])\n",
"client.register_plugin(plugin)"
]
},
{
"cell_type": "markdown",
"id": "91bbd464-1423-4353-81cc-f43806f04a7e",
"metadata": {},
"source": [
"### fileset preparation"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3dcf6216-0eca-4ea0-921b-eae3eda04af1",
"metadata": {},
"outputs": [],
"source": [
"# load metadata from file\n",
"fname = \"ntuple_production/file_metadata.json.gz\"\n",
"with gzip.open(fname) as f:\n",
" dataset_info = json.loads(f.read().decode())\n",
"\n",
"# construct fileset\n",
"fileset = {}\n",
"for containers_for_category in dataset_info.values():\n",
" for container, metadata in containers_for_category.items():\n",
" if metadata[\"files_output\"] is None:\n",
" # print(f\"skipping missing {container}\")\n",
" continue\n",
"\n",
" dsid, _, campaign = utils.dsid_rtag_campaign(container)\n",
"\n",
" # debugging shortcuts\n",
" # if campaign not in [\"mc20a\", \"data15\", \"data16\"]: continue\n",
" # if \"601352\" not in dsid: continue\n",
"\n",
" weight_xs = utils.sample_xs(campaign, dsid)\n",
" lumi = utils.integrated_luminosity(campaign)\n",
" fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n",
"\n",
"# minimal fileset for debugging\n",
"# fileset = {\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\": fileset[\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\"]}\n",
"# fileset"
]
},
{
"cell_type": "markdown",
"id": "f4a081b9-c4ec-41c8-830c-a727e56ff472",
"metadata": {},
"source": [
"### simple non-distributed reading"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b6c685f-7e9c-4c5b-8f80-19f1543de32f",
"metadata": {},
"outputs": [],
"source": [
"events = NanoEventsFactory.from_root(\n",
" {list(fileset[list(fileset.keys())[0]][\"files\"])[0]: \"reco\"},\n",
" mode=\"virtual\",\n",
" schemaclass=NtupleSchema,\n",
" entry_stop=1000\n",
").events()\n",
"\n",
"h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").StrCat([], name=\"variation\", growth=True).Weight()\n",
"\n",
"for variation in events.systematic_names:\n",
" if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n",
" continue\n",
"\n",
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
" h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, variation=variation)\n",
"\n",
"fig, ax = plt.subplots()\n",
"for variation in h.axes[1]:\n",
" h[:, variation].plot(histtype=\"step\", label=variation, ax=ax)\n",
"_ = ax.legend()"
]
},
{
"cell_type": "markdown",
"id": "a31f4dd8-07aa-4dd0-b50f-013349abe59a",
"metadata": {},
"source": [
"### pre-processing"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1abaeac0-ca4c-4a36-8426-10438c4e034e",
"metadata": {},
"outputs": [],
"source": [
"run = processor.Runner(\n",
" executor = processor.DaskExecutor(client=client),\n",
" # executor = processor.IterativeExecutor(),\n",
" schema=NtupleSchema,\n",
" savemetrics=True,\n",
" chunksize=100_000,\n",
" skipbadfiles=True,\n",
" # maxchunks=1\n",
")\n",
"\n",
"preprocess_output = run.preprocess(fileset)\n",
"\n",
"# write to disk\n",
"with open(\"preprocess_output.json\", \"w\") as f:\n",
" json.dump(utils.preprocess_to_json(preprocess_output), f)\n",
"\n",
"# load from disk\n",
"with open(\"preprocess_output.json\") as f:\n",
" preprocess_output = utils.json_to_preprocess(json.load(f))\n",
"\n",
"len(preprocess_output), preprocess_output[:3]"
]
},
{
"cell_type": "markdown",
"id": "4667b1bf-0ff3-4ccf-93e9-4f4a8e0aa3c7",
"metadata": {},
"source": [
"### processing"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc78d57d-4fe3-4e11-ab4c-0f0afe60ca32",
"metadata": {},
"outputs": [],
"source": [
"class Analysis(processor.ProcessorABC):\n",
" def __init__(self):\n",
" self.h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").\\\n",
" StrCat([], name=\"dsid_and_campaign\", growth=True).\\\n",
" StrCat([], name=\"variation\", growth=True).\\\n",
" Weight()\n",
"\n",
" def process(self, events):\n",
" f = uproot.open(events.metadata[\"filename\"])\n",
"\n",
" # this should match existing pre-determined metadata\n",
" # sim_type, mc_campaign, dsid, etag = f[\"metadata\"].axes[0]\n",
" # assert mc_campaign == events.metadata[\"campaign\"]\n",
" # assert dsid == events.metadata[\"dsid\"]\n",
"\n",
" # ensure systematics in schema and in histogram match\n",
" # systematics_from_hist = list(f[\"listOfSystematics\"].axes[0])\n",
" # assert sorted(systematics_from_hist) == sorted(events.systematic_names)\n",
"\n",
" # categorize events by DSID and campaign with a single histogram axis\n",
" dsid_and_campaign = f\"{events.metadata[\"dsid\"]}_{events.metadata[\"campaign\"]}\"\n",
"\n",
" if events.metadata[\"dsid\"] != \"data\":\n",
" sumw = float(f[f.keys(filter_name=\"CutBookkeeper*NOSYS\")[0]].values()[1]) # initial sum of weights\n",
" else:\n",
" sumw = None # no normalization for data\n",
"\n",
" for variation in events.systematic_names:\n",
" if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n",
" continue\n",
"\n",
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
" weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"]\n",
" self.h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n",
"\n",
" return {\n",
" \"hist\": self.h,\n",
" \"meta\": {\n",
" \"sumw\": {(events.metadata[\"dsid\"], events.metadata[\"campaign\"]): {(events.metadata[\"fileuuid\"], sumw)}}} # sumw in a set to avoid summing multiple times per file\n",
" }\n",
"\n",
" def postprocess(self, accumulator):\n",
" # normalize histograms\n",
" # https://topcptoolkit.docs.cern.ch/latest/starting/running_local/#sum-of-weights\n",
" for dsid_and_campaign in accumulator[\"hist\"].axes[1]:\n",
" dsid, campaign = dsid_and_campaign.split(\"_\")\n",
" if dsid == \"data\":\n",
" continue # no normalization for data by total number of weighted events\n",
" norm = 1 / sum([sumw for uuid, sumw in accumulator[\"meta\"][\"sumw\"][(dsid, campaign)]])\n",
" count_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].values()*norm\n",
" variance_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].variances()*norm**2\n",
" accumulator[\"hist\"][:, dsid_and_campaign, :] = np.stack([count_normalized, variance_normalized], axis=-1)\n",
"\n",
"\n",
"t0 = time.perf_counter()\n",
"out, report = run(preprocess_output, processor_instance=Analysis())\n",
"t1 = time.perf_counter()\n",
"report"
]
},
{
"cell_type": "markdown",
"id": "8663e9ff-f8bb-43a0-8978-f2d430d2bbbd",
"metadata": {},
"source": [
"track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9575019b-d1a5-4a8e-9d5a-32d0d8bd0919",
"metadata": {},
"outputs": [],
"source": [
"print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks\")\n",
"\n",
"print(f\"core-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n",
"print(f\"core-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\")\n",
"\n",
"print(f\"average event rate using walltime: {report[\"entries\"] / 1000 / (t1 - t0):.2f} kHz\")\n",
"print(f\"average data rate using walltime: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "982cce52-7f5c-4126-a5cc-6a4dfee70732",
"metadata": {},
"outputs": [],
"source": [
"mc_stack = []\n",
"labels = []\n",
"for key in dataset_info:\n",
" dsids = []\n",
" for container in dataset_info[key]:\n",
" dsids.append(container.split(\".\")[1])\n",
"\n",
" dsids = sorted(set(dsids))\n",
" dsids_in_hist = [dc for dc in out[\"hist\"].axes[1] if dc.split(\"_\")[0] in dsids]\n",
" print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n",
"\n",
" if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"] or len(dsids_in_hist) == 0:\n",
" continue # data drawn separately, skip MC modeling variations and skip empty categories\n",
"\n",
" mc_stack.append(out[\"hist\"][:, :, \"NOSYS\"].integrate(\"dsid_and_campaign\", dsids_in_hist))\n",
" labels.append(key)\n",
"\n",
"fig, ax1, ax2 = mplhep.data_model(\n",
" data_hist=out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"],\n",
" stacked_components=mc_stack,\n",
" stacked_labels=labels,\n",
" # https://scikit-hep.org/mplhep/gallery/model_with_stacked_and_unstacked_histograms_components/\n",
" # unstacked_components=[],\n",
" # unstacked_labels=[],\n",
" xlabel=out[\"hist\"].axes[0].label,\n",
" ylabel=\"Entries\",\n",
")\n",
"\n",
"mplhep.atlas.label(\"Internal\", ax=ax1, data=True, lumi=f\"{utils.integrated_luminosity(\"\", total=True) / 1000:.0f}\", com=\"13/ \\\\ 13.6 \\\\ TeV\")\n",
"mplhep.mpl_magic(ax=ax1)\n",
"ax2.set_ylim([0.5, 1.5])\n",
"\n",
"# compare to e.g. https://atlas.web.cern.ch/Atlas/GROUPS/PHYSICS/PAPERS/HDBS-2020-11/fig_02a.png\n",
"fig.savefig(\"el_pt.png\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b2e3efe0-f724-4206-b233-202a51729014",
"metadata": {},
"outputs": [],
"source": [
"# save to disk\n",
"import uhi.io.json\n",
"\n",
"with gzip.open(\"hist.json.gz\", \"w\") as f:\n",
" f.write(json.dumps(out[\"hist\"], default=uhi.io.json.default).encode(\"utf-8\"))\n",
"\n",
"with gzip.open(\"hist.json.gz\") as f:\n",
" h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))\n",
"\n",
"h[:, \"data_data15\", \"NOSYS\"]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
2 changes: 1 addition & 1 deletion atlas/ntuple_production/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
voms-proxy-init -voms atlas
```

- After ntuple production: `write_ntuple_metadata.py` saves relevant metadata of input and output containers plus xrootd ntuple file paths to disk.
- After ntuple production: `collect_file_metadata.py` saves relevant metadata of input and output containers plus xrootd ntuple file paths to disk.

## Notes for first and potential second production

Expand Down
Loading