In [1]:
# python
import sys
import os
import importlib
# columnar analysis
from coffea import processor
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
import awkward as ak
from dask.distributed import Client, performance_report
# local
sidm_path = str(os.getcwd()).split("/sidm")[0]
# sidm_path = str(sys.path[0]).split("/sidm")[0]
if sidm_path not in sys.path: sys.path.insert(1, sidm_path)
from sidm.tools import utilities, sidm_processor, scaleout, cutflow
from sidm.tools import llpnanoaodschema
# always reload local modules to pick up changes during development
importlib.reload(utilities)
importlib.reload(sidm_processor)
importlib.reload(scaleout)
# plotting
import matplotlib.pyplot as plt
utilities.set_plot_style()
%matplotlib inline
from tqdm.notebook import tqdm
import coffea.util
import numpy as np

In [2]:
client = scaleout.make_dask_client("tls://localhost:8786")
client

0,1
Connection method: Direct,
Dashboard: /user/dongyub.lee@cern.ch/proxy/8787/status,

0,1
Comm: tls://192.168.197.237:8786,Workers: 0
Dashboard: /user/dongyub.lee@cern.ch/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [4]:
samples = [
    # "2Mu2E_100GeV_0p25GeV_0p2mm",
    # "2Mu2E_100GeV_0p25GeV_10p0mm",
    # "2Mu2E_100GeV_5p0GeV_4p0mm",
    # "2Mu2E_100GeV_5p0GeV_200mm",
    # "2Mu2E_150GeV_0p25GeV_0p13mm",
    # "2Mu2E_150GeV_0p25GeV_6p7mm",
    # "2Mu2E_150GeV_5p0GeV_2p7mm",
    # "2Mu2E_150GeV_5p0GeV_130p0mm",
    # "2Mu2E_200GeV_0p25GeV_0p1mm",
    # "2Mu2E_200GeV_0p25GeV_5p0mm",
    # "2Mu2E_200GeV_5p0GeV_2p0mm",
    # "2Mu2E_200GeV_5p0GeV_100p0mm",
    # "2Mu2E_500GeV_0p25GeV_0p04mm",
    # "2Mu2E_500GeV_0p25GeV_2p0mm",
    "2Mu2E_500GeV_5p0GeV_0p8mm",
    "2Mu2E_500GeV_5p0GeV_40p0mm",
    "2Mu2E_800GeV_0p25GeV_0p025mm",
    "2Mu2E_800GeV_0p25GeV_1p2mm",
    "2Mu2E_800GeV_5p0GeV_0p5mm",
    "2Mu2E_800GeV_5p0GeV_25p0mm",
    "2Mu2E_1000GeV_0p25GeV_0p02mm",
    "2Mu2E_1000GeV_0p25GeV_1p0mm",
    "2Mu2E_1000GeV_5p0GeV_0p4mm",
    "2Mu2E_1000GeV_5p0GeV_20p0mm",
    
    # "DYJetsToMuMu_M10to50", 
    # "DYJetsToMuMu_M50",    
    # "TTJets",
    # "QCD_Pt80To120",
    # # "QCD_Pt120To170",
    # "QCD_Pt170To300",
    # "QCD_Pt300To470",
    # "QCD_Pt470To600",
    # "QCD_Pt600To800",
    # # "QCD_Pt800To1000",
    # "QCD_Pt1000",
]

In [4]:
samples_bkg = [ 
    "TTJets",
    "QCD_Pt80To120", 
    # "QCD_Pt120To170",
    "QCD_Pt170To300", 
    "QCD_Pt300To470",
    "QCD_Pt470To600", 
    "QCD_Pt600To800", 
    # "QCD_Pt800To1000",
    "QCD_Pt1000", 
    "DYJetsToMuMu_M10to50",
    "DYJetsToMuMu_M50", 
]

In [5]:
fileset = utilities.make_fileset(samples[0:24], 
                                 "llpNanoAOD_v2", 
                                 location_cfg="signal_2mu2e_v10.yaml",
                                 max_files = -1,
                                )

In [6]:
runner = processor.Runner(
    executor=processor.DaskExecutor(client=client),
    # executor=processor.IterativeExecutor(),
    schema=llpnanoaodschema.LLPNanoAODSchema,
    skipbadfiles=True
)

channels = [
    # "base_CC", "base_CC_isosel"
    "base_CC_nocharge", "chargetest_basic", "chargetest_onlycharge", "chargetest_only2LJ", "munumtest_onlynum", "munumtest_both", "base_CC"
]

p = sidm_processor.SidmProcessor(
    channels,
    # ["matched_jet_base", "fraction_base", "isolation_base", "mother_tracking_base", "muon_crosscleaning_base", "exo_2d_base"],
    ["charge_sum_base", "genA_base", "genA_toMu_base", "genA_toE_base"],
    unweighted_hist=False,
)

out = {}
for i, sample in enumerate(samples):

    # print(f"Processing {sample}")
    fileset_one_sample = {samples[i]:fileset.get(samples[i])}
    
    output = runner.run(fileset_one_sample, treename='Events', processor_instance=p)

    #Add this sample's output to the out variable
    out[sample] = output["out"][sample]

    #Save output to a file!!
    out_file_name = "output_" + sample + ".coffea"
    coffea.util.save(output,out_file_name)

2Mu2E_500GeV_5p0GeV_0p8mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_500GeV_5p0GeV_40p0mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_800GeV_0p25GeV_0p025mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_800GeV_0p25GeV_1p2mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_800GeV_5p0GeV_0p5mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_800GeV_5p0GeV_25p0mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_1000GeV_0p25GeV_0p02mm is simulation. Scaling histograms or cutflows according to lumi*xs.
Signal not in xs cfg, assuming 1fb
2Mu2E_1000GeV_0p25GeV_1p0mm is simulation. Scaling histograms or cutflows according to l

In [8]:
import os
import math
from copy import deepcopy
from coffea.util import save, load
from dask.distributed import performance_report
try:
    from coffea.processor import accumulate
except Exception:
    from coffea.processor.accumulator import accumulate


# ==============================
# 0) Parameter
# ==============================
SAMPLES_SLICE = samples_bkg[8:9]    
TREE_NAME     = "Events"
LOCATION_CFG  = "backgrounds.yaml"
MAX_FILES     = 1000
N_BATCHES     = 1                   
AUTO_SPLIT    = False               
FILES_PER_BATCH_TARGET = 7000       # Target files when AUTO_SPLIT=True

# ======================================
# 1) fileset
# ======================================
fileset = utilities.make_fileset(
    SAMPLES_SLICE,
    "skimmed_llpNanoAOD_v2",
    location_cfg=LOCATION_CFG,
    max_files=MAX_FILES,
)

# single sample
assert len(fileset) == 1
sample_name = next(iter(fileset.keys()))
sample_entry = fileset[sample_name]  # list / tuple / dict

# ======================================
# 2) File list normalize function
# ======================================
def normalize_files(entry):
    # case A: list/tuple
    if isinstance(entry, (list, tuple)):
        return list(entry), "list"

    # case B: dict
    if isinstance(entry, dict):
        if "files" in entry:
            files = entry["files"]
            if isinstance(files, dict):   
                return list(files.keys()), "dict_files_key"
            return list(files), "dict_files_key"
        return list(entry.keys()), "dict_filemap"

    raise TypeError(f"Unsupported fileset entry type: {type(entry)}")

file_list, entry_kind = normalize_files(sample_entry)

# ======================================
# 3) Batch function
# ======================================
def split_even(lst, n):
    if n <= 0:
        raise ValueError("n must be positive")
    k, m = divmod(len(lst), n)
    chunks = []
    start = 0
    for i in range(n):
        size = k + (1 if i < m else 0)
        if size > 0:
            chunks.append(lst[start:start+size])
            start += size
    return chunks

# Number of batch
if AUTO_SPLIT:
    N_BATCHES = max(1, math.ceil(len(file_list) / max(1, FILES_PER_BATCH_TARGET)))

batches = split_even(file_list, N_BATCHES)

print(f"[Info] sample={sample_name}, total_files={len(file_list)}, batches={len(batches)}")
for i, b in enumerate(batches, 1):
    print(f"  - Batch {i}: {len(b)} files")

# ======================================
# 4) Partial processing
# ======================================
part_output_files = []

for i, files_in_batch in enumerate(batches, start=1):
    out_file_name = f"output_{sample_name}_part{i}.coffea"
    report_name   = f"dask-report_part{i}.html"

    # Skip when file already exist
    if os.path.exists(out_file_name):
        print(f"[Batch {i}] {out_file_name} already exist → Skipping.")
        part_output_files.append(out_file_name)
        continue

    if entry_kind == "list":
        sub_fileset_entry = files_in_batch

    elif entry_kind in ("dict_files_key", "dict_filemap"):
        if isinstance(sample_entry, dict) and "files" in sample_entry:
            sub_fileset_entry = dict(sample_entry) 
            sub_fileset_entry["files"] = files_in_batch
        else:
            try:
                sub_fileset_entry = {f: sample_entry[f] for f in files_in_batch}
            except Exception:
                sub_fileset_entry = files_in_batch
    else:
        raise RuntimeError("Unexpected entry_kind")

    sub_fileset = {sample_name: sub_fileset_entry}

    # Processor
    try:
        p_to_use = deepcopy(p)
    except Exception:
        p_to_use = p

    print(f"[Batch {i}/{len(batches)}] files: {len(files_in_batch)} → running...")

    with performance_report(filename=report_name):
        output = runner.run(
            sub_fileset,
            treename=TREE_NAME,
            processor_instance=p_to_use,
        )

    save(output, out_file_name)
    part_output_files.append(out_file_name)
    print(f"[Batch {i}] done → saved {out_file_name} and {report_name}")

[Info] sample=DYJetsToMuMu_M50, total_files=1000, batches=1
  - Batch 1: 1000 files
[Batch 1/1] files: 1000 → running...
DYJetsToMuMu_M50 is simulation. Scaling histograms or cutflows according to lumi*xs.
[Batch 1] done → saved output_DYJetsToMuMu_M50_part1.coffea and dask-report_part1.html
