In [11]:
import os, glob
import pandas as pd
print("os.cpu_count() =", os.cpu_count())
print("SLURM_CPUS_PER_TASK =", os.environ.get("SLURM_CPUS_PER_TASK"))
print("SLURM_JOB_CPUS_PER_NODE =", os.environ.get("SLURM_JOB_CPUS_PER_NODE"))

os.cpu_count() = 192
SLURM_CPUS_PER_TASK = 16
SLURM_JOB_CPUS_PER_NODE = 16


In [1]:
from graphnet.data.extractors.icecube.utilities.i3_filters import I3Filter
from graphnet.utilities.imports import has_icecube_package

if has_icecube_package():
    from icecube import icetray  # noqa: F401


class NonEmptyPulseSeriesI3Filter(I3Filter):
    """Drop frame if given PulseSeriesMap/Mask is empty (has 0 pulses)."""

    def __init__(self, pulsemap_name: str = "EventPulseSeries_nonoise"):
        super().__init__(name=__name__, class_name=self.__class__.__name__)
        self._pulsemap_name = pulsemap_name

    def _keep_frame(self, frame: "icetray.I3Frame") -> bool:
        if not frame.Has(self._pulsemap_name):
            return False

        pm = frame[self._pulsemap_name]

        # Handle MapMask vs Map
        try:
            if hasattr(pm, "apply"):
                pm = pm.apply(frame)
        except Exception:
            # if apply fails for any reason, treat as not keep
            return False

        # Count total pulses across all OMKeys/PMTs
        try:
            total = 0
            for _, series in pm.items():
                total += len(series)
                if total > 0:
                    return True
            return False
        except Exception:
            # fallback: if it behaves like a container
            try:
                return len(pm) > 0
            except Exception:
                return False




  from .autonotebook import tqdm as notebook_tqdm




In [3]:
import sys, os, glob, re

repo_root = os.path.abspath("..")
sys.path.append(repo_root)

from MyClasses.reader import PONE_Reader
from MyClasses.feature_extractor import I3FeatureExtractorPONE
from MyClasses.truth_extractor import I3TruthExtractorPONE

In [4]:
INPUT_GLOB = "/project/def-nahee/kbas/POM_Response/*.i3.gz"
OUTDIR     = "/project/def-nahee/kbas/POM_Response_Parquet"
GCD_RESCUE = "/project/6008051/pone_simulation/GCD_Library/PONE_800mGrid.i3.gz"


In [5]:
def batch_id_from_i3(path):
    m = re.search(r"batch_(\d+)\.i3\.gz$", os.path.basename(path))
    return int(m.group(1)) if m else None

def batch_ids_in_outdir(outdir):
    # outdir içinde "batch_1234" geçen her şeyi tara
    candidates = glob.glob(os.path.join(outdir, "**", "*"), recursive=True)
    ids = set()
    for p in candidates:
        m = re.search(r"batch_(\d+)", os.path.basename(p))
        if m:
            ids.add(int(m.group(1)))
    return ids

In [6]:
all_files = sorted(glob.glob(INPUT_GLOB))
done_ids  = batch_ids_in_outdir(OUTDIR)

In [7]:
todo = []
for f in all_files:
    bid = batch_id_from_i3(f)
    if bid is None:
        continue
    if bid not in done_ids:
        todo.append(f)

In [8]:
print("Total i3:", len(all_files))
print("Done batches:", len(done_ids))
print("Todo i3:", len(todo))
print("First 5 todo:", todo[:5])

Total i3: 9817
Done batches: 0
Todo i3: 9817
First 5 todo: ['/project/def-nahee/kbas/POM_Response/pom_response_batch_000.i3.gz', '/project/def-nahee/kbas/POM_Response/pom_response_batch_001.i3.gz', '/project/def-nahee/kbas/POM_Response/pom_response_batch_002.i3.gz', '/project/def-nahee/kbas/POM_Response/pom_response_batch_003.i3.gz', '/project/def-nahee/kbas/POM_Response/pom_response_batch_004.i3.gz']


In [9]:
from graphnet.data.dataconverter import DataConverter

In [10]:
from graphnet.data.writers import ParquetWriter
from graphnet.data.extractors.icecube.utilities.i3_filters import NullSplitI3Filter

In [12]:
reader = PONE_Reader(
    gcd_rescue=GCD_RESCUE,
    i3_filters=[NullSplitI3Filter(), NonEmptyPulseSeriesI3Filter("EventPulseSeries_nonoise")]
)

[1;34mgraphnet[0m [MainProcess] [32mINFO    [0m 2026-01-26 04:09:27 - NullSplitI3Filter.__init__ - Writing log to [1mlogs/graphnet_20260126-040927.log[0m


In [13]:
extractors = [
    I3FeatureExtractorPONE(pulsemap="EventPulseSeries_nonoise", name="features", exclude=['pmt_area', 'rde', 'width', 'event_time', 'is_bright_dom', 'is_saturated_dom', 'is_errata_dom', 'is_bad_dom', 'hlc','awtd', 'dom_type']),
    I3TruthExtractorPONE(mctree="I3MCTree_postprop", name="truth", exclude=['L7_oscNext_bool', 'L6_oscNext_bool',
                                                               'L5_oscNext_bool', 'L4_oscNext_bool',
                                                               'L3_oscNext_bool',
                                                               'OnlineL2Filter_17','MuonFilter_13',
                                                               'CascadeFilter_13','DeepCoreFilter_13', 
                                                                'dbang_decay_length', 'track_length', 'stopped_muon', 'energy_track', 'energy_cascade', 'inelasticity', 'is_starting'] ),

]


# included: pmt locs. check
# include: truth per  pulse being noise or not. 

In [14]:
writer = ParquetWriter(truth_table="truth", index_column="event_no")

In [15]:
converter = DataConverter(
    file_reader=reader,
    save_method=writer,  
    extractors=extractors,
    outdir=OUTDIR,
    num_workers=16,  #4?
    index_column="event_no",
)




In [17]:
converter(input_dir="/project/def-nahee/kbas/POM_Response/")
# used:
# salloc --time=5:30:00 --account=def-nahee --ntasks=1 --cpus-per-task=16 --mem=128G 
# less than 5 hours would also work

Assuming list of directories.
[1;34mgraphnet[0m [MainProcess] [32mINFO    [0m 2026-01-26 04:15:15 - DataConverter.__call__ - Starting pool of 16 workers to process9817 file(s)[0m


 15%|[32m███████████████▎                                                                                     [0m| 1494/9817 [10:03<44:56,  3.09 file(s)/s][0m

UnboundLocalError: cannot access local variable 'frame' where it is not associated with a value

100%|[32m███████████████████████████████████████████████████████████████████████████████████████████████████[0m| 9817/9817 [1:06:55<00:00,  2.44 file(s)/s][0m


In [19]:
print("DONE:", OUTDIR)

DONE: /project/def-nahee/kbas/POM_Response_Parquet


In [20]:
INPUT_DIR = "/project/def-nahee/kbas/POM_Response/"
OUTDIR    = "/project/def-nahee/kbas/POM_Response_Parquet"

in_files = sorted(glob.glob(os.path.join(INPUT_DIR, "**", "*.i3.gz"), recursive=True))
in_files += sorted(glob.glob(os.path.join(INPUT_DIR, "**", "*.i3"), recursive=True))

def stem_from_i3(path: str) -> str:
    base = os.path.basename(path)
    if base.endswith(".i3.gz"):
        return base[:-len(".i3.gz")]
    if base.endswith(".i3"):
        return base[:-len(".i3")]
    return os.path.splitext(base)[0]

in_stems = [stem_from_i3(f) for f in in_files]

truth_dir = os.path.join(OUTDIR, "truth")
feat_dir  = os.path.join(OUTDIR, "features")

truth_files = sorted(glob.glob(os.path.join(truth_dir, "*_truth.parquet")))
feat_files  = sorted(glob.glob(os.path.join(feat_dir, "*_features.parquet")))

def stem_from_parquet(path: str, suffix: str) -> str:
    base = os.path.basename(path)
    return base[:-len(suffix)] if base.endswith(suffix) else os.path.splitext(base)[0]

truth_stems = set(stem_from_parquet(p, "_truth.parquet") for p in truth_files)
feat_stems  = set(stem_from_parquet(p, "_features.parquet") for p in feat_files)

rows = []
for f, s in zip(in_files, in_stems):
    rows.append({
        "input_file": f,
        "stem": s,
        "truth_exists": s in truth_stems,
        "features_exists": s in feat_stems,
    })

df = pd.DataFrame(rows)
df["both_exist"] = df["truth_exists"] & df["features_exists"]
df["missing_any"] = ~(df["both_exist"])

print("INPUT i3 files:", len(df))
print("truth parquet:", len(truth_stems))
print("features parquet:", len(feat_stems))
print("both exist:", int(df["both_exist"].sum()))
print("missing any:", int(df["missing_any"].sum()))

missing = df[df["missing_any"]].copy()
missing.to_csv("missing_conversions.csv", index=False)

with open("missing_input_files.txt", "w") as w:
    for p in missing["input_file"]:
        w.write(p + "\n")

df


INPUT i3 files: 9817
truth parquet: 9787
features parquet: 9787
both exist: 9787
missing any: 30


Unnamed: 0,input_file,stem,truth_exists,features_exists,both_exist,missing_any
0,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_000,True,True,True,False
1,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_001,True,True,True,False
2,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_002,True,True,True,False
3,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_003,True,True,True,False
4,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_004,True,True,True,False
...,...,...,...,...,...,...
9812,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_9995,True,True,True,False
9813,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_9996,True,True,True,False
9814,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_9997,True,True,True,False
9815,/project/def-nahee/kbas/POM_Response/pom_respo...,pom_response_batch_9998,True,True,True,False


I could not create POM Response (GZ) for some (around 400) batches. (Or maybe I created wrongly). Here, they must be excluded. Additionally, here, there may be some new missing batches during the transition GZ -> Parquet (around 30 batches). However, I still continue and merge the files. Later I can handle missing files and re-create the merged file again.

### Create the merged file (later I will split into train/val/test)

In [22]:
MERGED_DIR = os.path.join(OUTDIR, "merged")

In [23]:
MERGED_DIR

'/project/def-nahee/kbas/POM_Response_Parquet/merged'

In [24]:
writer.merge_files(
    files=[],
    output_dir=MERGED_DIR,
    events_per_batch=1024,
    num_workers=16,
)

#  this could handle: 
#  salloc --time=2:30:00 --account=def-nahee --ntasks=1 --cpus-per-task=16 --mem=128G 

[1;34mgraphnet[0m [MainProcess] [32mINFO    [0m 2026-01-26 05:32:39 - ParquetWriter.run_code - Processing 394 batches using 16 cores.[0m


100%|[32m███████████████████████████████████████████████████████████████████████████████████████████████████████[0m| 394/394 [03:00<00:00,  2.18shard(s)/s][0m


In [25]:
from pathlib import Path
import re
import polars as pl
import pandas as pd

MERGED_DIR = Path(MERGED_DIR)   # senin OUTDIR/merged

def pick_event_col(parquet_path: Path) -> str:
    cols = pl.read_parquet(parquet_path, n_rows=0).columns
    if "event_no" in cols:
        return "event_no"
    if "__index_level_0__" in cols:   # pandas index bazen böyle kaydediyor
        return "__index_level_0__"
    # fallback: ilk kolon (en azından unique sayısı alabilelim)
    return cols[0]

def count_rows(parquet_path: Path) -> int:
    return int(pl.scan_parquet(parquet_path).select(pl.len()).collect().item())

def count_unique_events(parquet_path: Path) -> int:
    col = pick_event_col(parquet_path)
    return int(pl.scan_parquet(parquet_path).select(pl.col(col).n_unique()).collect().item())

# batch id’leri (truth_123.parquet gibi)
truth_dir = MERGED_DIR / "truth"
feat_dir  = MERGED_DIR / "features"

pat_truth = re.compile(r"^truth_(\d+)\.parquet$")
pat_feat  = re.compile(r"^features_(\d+)\.parquet$")

truth_files = {int(pat_truth.match(p.name).group(1)): p for p in truth_dir.glob("truth_*.parquet") if pat_truth.match(p.name)}
feat_files  = {int(pat_feat.match(p.name).group(1)): p for p in feat_dir.glob("features_*.parquet") if pat_feat.match(p.name)}

batch_ids = sorted(set(truth_files) & set(feat_files))

rows = []
for bid in batch_ids:
    tf = truth_files[bid]
    ff = feat_files[bid]
    rows.append({
        "batch_id": bid,
        "truth_rows": count_rows(tf),
        "truth_unique_events": count_unique_events(tf),
        "features_rows": count_rows(ff),
        "features_unique_events": count_unique_events(ff),
    })

df = pd.DataFrame(rows).sort_values("batch_id").reset_index(drop=True)
df


Unnamed: 0,batch_id,truth_rows,truth_unique_events,features_rows,features_unique_events
0,0,1024,1024,152763,1024
1,1,1024,1024,148463,1024
2,2,1024,1024,144144,1024
3,3,1024,1024,178179,1024
4,4,1024,1024,146573,1024
...,...,...,...,...,...
389,389,1024,1024,145768,1024
390,390,1024,1024,143678,1024
391,391,1024,1024,185765,1024
392,392,1024,1024,141762,1024


In [36]:
# there are 
393*1024+482
# events

402914

In [37]:
from pathlib import Path
import os, re, random, shutil, json

MERGED_DIR = Path(MERGED_DIR)          # .../merged (şu anki)
MERGED_RAW = MERGED_DIR.parent / "merged_raw"

# 1) merged -> merged_raw (eğer daha önce yapılmadıysa)
if not MERGED_RAW.exists():
    os.rename(MERGED_DIR, MERGED_RAW)
    print("moved:", MERGED_DIR, "->", MERGED_RAW)
else:
    print("merged_raw already exists:", MERGED_RAW)

moved: /project/def-nahee/kbas/POM_Response_Parquet/merged -> /project/def-nahee/kbas/POM_Response_Parquet/merged_raw


In [38]:
truth_dir = MERGED_RAW / "truth"
feat_dir  = MERGED_RAW / "features"

pat = re.compile(r"^truth_(\d+)\.parquet$")
batch_ids = sorted(int(pat.match(p.name).group(1)) for p in truth_dir.glob("truth_*.parquet") if pat.match(p.name))

last_batch = max(batch_ids)            # 482'lik batch bu olmalı
main_batches = [b for b in batch_ids if b != last_batch]

In [39]:
seed = 42
rng = random.Random(seed)
rng.shuffle(main_batches)

n = len(main_batches)
n_train = int(0.8 * n)
n_val   = int(0.1 * n)

splits = {
    "train": main_batches[:n_train],
    "val":   main_batches[n_train:n_train+n_val],
    "test":  main_batches[n_train+n_val:] + [last_batch],  # <-- last batch burada
}

In [40]:
def link_or_copy(src: Path, dst: Path):
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists():
        return
    try:
        os.link(src, dst)          # hardlink (tercih)
    except Exception:
        try:
            os.symlink(src, dst)   # symlink
        except Exception:
            shutil.copy2(src, dst) # fallback copy

In [41]:
NEW_MERGED = MERGED_RAW.parent / "merged"
tables = ["truth", "features"]

for split_name, ids in splits.items():
    for table in tables:
        for bid in ids:
            src = (MERGED_RAW / table / f"{table}_{bid}.parquet")
            dst = (NEW_MERGED / split_name / table / src.name)
            if src.exists():
                link_or_copy(src, dst)

# manifest kaydet
manifest = {
    "seed": seed,
    "fractions": {"train": 0.8, "val": 0.1, "test": 0.1},
    "last_batch_forced_to_test": int(last_batch),
    "counts_in_batches": {k: len(v) for k, v in splits.items()},
    "splits": splits,
}

with open(NEW_MERGED / "split_manifest.json", "w") as f:
    json.dump(manifest, f, indent=2)

print("done:", NEW_MERGED)
print({k: len(v) for k, v in splits.items()})

done: /project/def-nahee/kbas/POM_Response_Parquet/merged
{'train': 314, 'val': 39, 'test': 41}


In [43]:
314+39+41 # total number of batches

394