# Phase 2 — Build certified training table (NanoAOD → Parquet)

We will:
1) load validated run/lumi JSON (muons-only)
2) stream ROOT files with uproot.iterate (memory-safe)
3) build dimuon candidates + features
4) add trigger labels from `HLT_*`
5) write Parquet shards for ML


In [2]:
# Cell 1 — Install deps (no XRootD needed)
!pip -q install "uproot>=5" awkward vector rich tqdm pandas pyarrow fastparquet matplotlib


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m393.8/393.8 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m919.6/919.6 kB[0m [31m41.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m656.7/656.7 kB[0m [31m47.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m181.2/181.2 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m79.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [1]:
# Cell P2-1 — Config
from pathlib import Path
import numpy as np
import pandas as pd
import awkward as ak
import uproot
import vector

vector.register_awkward()

BASE_PATH = Path("/kaggle/input/datasets/katakuricharlotte/doublemuon2016g-rootfiles/root_converted")
ROOT_FILES = sorted(map(str, BASE_PATH.glob("*.root")))
len(ROOT_FILES), ROOT_FILES[:3]


ModuleNotFoundError: No module named 'awkward'

In [None]:
# Cell P2-2 — Download validated runs JSON (muons only) and parse
# The dataset record links validated runs JSONs; for this project we use "muons only". [page:14]
import json, urllib.request

VALID_JSON_URL = "https://opendata.cern.ch/record/14221/files/Cert_271036-284044_13TeV_Legacy2016_Collisions16_JSON_MuonPhys.txt"
json_path = Path("/kaggle/working/validated_runs_muons_2016.txt")

if not json_path.exists():
    urllib.request.urlretrieve(VALID_JSON_URL, json_path)

with open(json_path, "r") as f:
    certified = json.load(f)

# certified is dict: { "run": [ [lumiStart, lumiEnd], ... ], ... }
len(certified), list(certified.items())[:1]


In [None]:
# Cell P2-3 — Fast certified filter helpers
def is_certified(run, lumi, certified_map):
    # run: int, lumi: int
    rs = str(int(run))
    if rs not in certified_map:
        return False
    for lo, hi in certified_map[rs]:
        if lo <= int(lumi) <= hi:
            return True
    return False

def mask_certified(runs, lumis, certified_map):
    # vectorized-ish for numpy arrays
    out = np.zeros(len(runs), dtype=bool)
    for i, (r, l) in enumerate(zip(runs, lumis)):
        out[i] = is_certified(r, l, certified_map)
    return out


In [None]:
# Cell P2-4 — Choose triggers to emulate (auto-detect, then pick 3)
# Record 30522 lists possible HLT trigger paths for this dataset. [page:14]
test = uproot.open(ROOT_FILES[0])["Events"]
branches = test.keys()

hlt = sorted([b for b in branches if b.startswith("HLT_") and "Mu" in b])
hlt[:50], len(hlt)


In [None]:
# Cell P2-5 — Pick 3 practical labels (prefer common dimuon triggers if present)
preferred = [
    "HLT_Mu17_Mu8",
    "HLT_Mu17_Mu8_DZ",
    "HLT_Mu17_TrkIsoVVL_Mu8_TrkIsoVVL_DZ",
]
LABELS = [x for x in preferred if x in branches]
if len(LABELS) < 1:
    # fallback: take the first few muon triggers present
    LABELS = hlt[:3]

LABELS


In [None]:
# Cell P2-6 — Feature engineering (dimuon-level) + write Parquet shards
import pyarrow as pa
import pyarrow.parquet as pq

MUON_MASS = 0.105658

FEATURES = [
    "run", "luminosityBlock", "event",
    "PV_npvs", "MET_pt",
    "Muon_pt", "Muon_eta", "Muon_phi", "Muon_charge",
    "Muon_pfRelIso03_all", "Muon_tightId", "Muon_mediumId",
]

# Keep only branches that exist
BASE_READ = [b for b in FEATURES if b in branches] + LABELS

OUT_DIR = Path("/kaggle/working/parquet_dimuon")
OUT_DIR.mkdir(parents=True, exist_ok=True)

def build_dimuon_table(a):
    # Certified mask at event level
    runs = ak.to_numpy(a["run"])
    lumis = ak.to_numpy(a["luminosityBlock"])
    good = mask_certified(runs, lumis, certified)

    a = a[good]

    # Build vector muons
    mu = vector.zip({
        "pt": a["Muon_pt"],
        "eta": a["Muon_eta"],
        "phi": a["Muon_phi"],
        "mass": ak.ones_like(a["Muon_pt"]) * MUON_MASS,
        "charge": a["Muon_charge"],
    })

    # quality: prefer mediumId if present
    qual = ak.ones_like(a["Muon_pt"], dtype=bool)
    if "Muon_mediumId" in a.fields:
        qual = qual & (a["Muon_mediumId"] == 1)
    elif "Muon_tightId" in a.fields:
        qual = qual & (a["Muon_tightId"] == 1)

    mu = mu[qual]

    # Opposite-sign pairs
    pairs = ak.combinations(mu, 2, fields=["m1", "m2"])
    os_pairs = pairs[(pairs.m1.charge * pairs.m2.charge) < 0]
    dimu = os_pairs.m1 + os_pairs.m2

    # Build per-pair feature table
    # Event keys are repeated for each pair
    out = {
        "run": ak.flatten(ak.broadcast_arrays(a["run"], dimu.mass)[0]),
        "lumi": ak.flatten(ak.broadcast_arrays(a["luminosityBlock"], dimu.mass)[0]),
        "event": ak.flatten(ak.broadcast_arrays(a["event"], dimu.mass)[0]),
        "m_mumu": ak.to_numpy(ak.flatten(dimu.mass)),
        "pt_mumu": ak.to_numpy(ak.flatten(dimu.pt)),
        "eta_mumu": ak.to_numpy(ak.flatten(dimu.eta)),
        "dR_mumu": ak.to_numpy(ak.flatten(os_pairs.m1.deltaR(os_pairs.m2))),
    }

    # Add event-level context (repeat per pair)
    if "PV_npvs" in a.fields:
        out["PV_npvs"] = ak.to_numpy(ak.flatten(ak.broadcast_arrays(a["PV_npvs"], dimu.mass)[0]))
    if "MET_pt" in a.fields:
        out["MET_pt"] = ak.to_numpy(ak.flatten(ak.broadcast_arrays(a["MET_pt"], dimu.mass)[0]))

    # Add trigger labels (repeat per pair)
    for lab in LABELS:
        out[lab] = ak.to_numpy(ak.flatten(ak.broadcast_arrays(a[lab], dimu.mass)[0])).astype(np.int8)

    return pd.DataFrame(out)

# Iterate files in chunks (safe for memory)
MAX_EVENTS_TOTAL = 1_000_000   # scale later
events_seen = 0
shard = 0

for arrays in uproot.iterate(
    ROOT_FILES,
    "Events",
    expressions=BASE_READ,
    step_size="100 MB",
    library="ak"
):
    df = build_dimuon_table(arrays)
    if len(df) == 0:
        continue

    out_path = OUT_DIR / f"dimuon_shard_{shard:03d}.parquet"
    df.to_parquet(out_path, index=False)
    shard += 1

    events_seen += len(arrays["run"])
    if events_seen >= MAX_EVENTS_TOTAL:
        break

shard, events_seen
