Cell 1 — Imports + config

In [None]:
import os
import glob
import numpy as np
import pandas as pd
from pathlib import Path

np.random.seed(42)  # reproducible (walau analisis deterministic)

# =========================
# PATHS (sesuaikan)
# =========================
BASE_HEAD2_DIR = Path(r"E:\0.TA_Teguh\GMM Trial 2\Head 2")  # Head 2 output (per subject folder, per trial file)
SUBJECTS = list("ABCDEFGHIJ")

# =========================
# TRIAL-2 params (untuk valid fallback)
# =========================
MIN_POINTS = 5   

print("BASE_HEAD2_DIR:", BASE_HEAD2_DIR)
print("SUBJECTS      :", SUBJECTS)
print("MIN_POINTS    :", MIN_POINTS)


Cell 2 — Utility: list files Head-2 per subject

In [None]:
def list_head2_files(subject: str):
    """
    List all Head-2 CSV files for a subject.
    Expected pattern: BASE_HEAD2_DIR/<subject>/Jalan*.csv
    """
    subj_dir = BASE_HEAD2_DIR / subject
    pattern = str(subj_dir / "Jalan*.csv")
    files = sorted(glob.glob(pattern))
    return files

# quick sanity check
for s in SUBJECTS:
    files = list_head2_files(s)
    print(f"{s}: {len(files)} files, example: {files[0] if files else 'NONE'}")


Cell 3 — Load Head-2 into one DataFrame (frame-level), with dedup + schema normalization

In [None]:
def _pick_col(df: pd.DataFrame, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def load_all_head2(subjects=SUBJECTS):
    rows = []
    dup_reports = []

    for subj in subjects:
        files = list_head2_files(subj)
        if not files:
            print(f"[WARN] No Head-2 files for subject {subj}")
            continue

        for fpath in files:
            fpath = Path(fpath)
            trial_name = fpath.stem  # "Jalan12"
            try:
                df = pd.read_csv(fpath)
            except Exception as e:
                print(f"[ERROR] Failed reading {fpath}: {e}")
                continue

            # ---- required columns (try to be robust)
            col_frame = _pick_col(df, ["frame", "Frame", "frame_id"])
            col_nroi  = _pick_col(df, ["N_roi", "n_roi", "N_ROI", "nroi"])
            col_valid = _pick_col(df, ["valid_minpts", "valid", "is_valid"])
            col_nin   = _pick_col(df, ["N_inlier", "n_inlier", "N_INLIER", "ninlier", "nInlier"])
            col_conf  = _pick_col(df, ["conf", "confidence", "conf_frame"])

            if col_frame is None:
                print(f"[WARN] Missing 'frame' in {fpath.name}, skip")
                continue

            # build normalized frame-level table
            out = pd.DataFrame()
            out["subject"] = subj
            out["trial"]   = trial_name
            out["frame"]   = pd.to_numeric(df[col_frame], errors="coerce").astype("Int64")

            # optional numeric columns
            if col_nroi is not None:
                out["N_roi"] = pd.to_numeric(df[col_nroi], errors="coerce")
            else:
                out["N_roi"] = np.nan

            if col_nin is not None:
                out["N_inlier"] = pd.to_numeric(df[col_nin], errors="coerce")
            else:
                # if not found, we can't do N_target analysis properly
                out["N_inlier"] = np.nan

            if col_conf is not None:
                out["conf"] = pd.to_numeric(df[col_conf], errors="coerce")
            else:
                out["conf"] = np.nan

            # validity: prefer explicit valid_minpts, otherwise fallback using N_roi >= MIN_POINTS
            if col_valid is not None:
                v = pd.to_numeric(df[col_valid], errors="coerce")
                out["valid_minpts"] = (v.fillna(0).astype(int) > 0).astype(int)
            else:
                out["valid_minpts"] = ((out["N_roi"].fillna(0) >= MIN_POINTS)).astype(int)

            rows.append(out)

    if not rows:
        return pd.DataFrame(), pd.DataFrame()

    all_df = pd.concat(rows, ignore_index=True)

    # drop rows with missing frame
    all_df = all_df.dropna(subset=["frame"]).copy()
    all_df["frame"] = all_df["frame"].astype(int)

    # ---- DUPLICATE CHECK: (subject, trial, frame) must be unique
    key = ["subject", "trial", "frame"]
    dup_mask = all_df.duplicated(key, keep=False)
    if dup_mask.any():
        dups = all_df.loc[dup_mask, key].value_counts().reset_index(name="count")
        dup_reports = dups.sort_values("count", ascending=False)
        print(f"[WARN] Found duplicates for (subject, trial, frame): {len(dup_reports)} keys duplicated")
        # keep last occurrence
        all_df = all_df.sort_values(key).drop_duplicates(key, keep="last").reset_index(drop=True)
        print("[INFO] Duplicates dropped (keep='last').")

    dup_reports_df = pd.DataFrame(dup_reports) if isinstance(dup_reports, list) else dup_reports
    return all_df, dup_reports_df

df_h2, df_dups = load_all_head2(SUBJECTS)

print("df_h2 shape:", df_h2.shape)
df_h2.head()


Cell 4 — Sanity checks (missing columns, NaNs, duplicates report)

In [None]:
# duplicates report (if any)
if isinstance(df_dups, pd.DataFrame) and not df_dups.empty:
    display(df_dups.head(20))

# how many NaNs in N_inlier?
nan_nin = df_h2["N_inlier"].isna().sum()
print("NaN N_inlier:", nan_nin, "out of", len(df_h2))

# if too many NaNs, stop (analysis would be meaningless)
if nan_nin > 0:
    print("[WARN] Some rows have NaN N_inlier. Those frames will be excluded from sampling pressure calc.")

# valid/invalid counts
print(df_h2["valid_minpts"].value_counts(dropna=False))


Cell 5 — Build point-count dataset for analysis

In [None]:
# Keep only rows where N_inlier is known
df_counts = df_h2.dropna(subset=["N_inlier"]).copy()
df_counts["N_inlier"] = df_counts["N_inlier"].astype(int)

# Two views:
# - ALL frames (including invalid) -> useful to understand how many invalid exist
# - VALID frames only -> the ONLY one that should drive N_target decisions
df_all   = df_counts.copy()
df_valid = df_counts[df_counts["valid_minpts"] == 1].copy()

print("ALL frames used   :", len(df_all))
print("VALID frames used :", len(df_valid))
display(df_valid.head())


Cell 6 — Descriptive stats (global + per subject)

In [None]:
def describe_points(arr: np.ndarray):
    arr = np.asarray(arr)
    return {
        "min": float(np.min(arr)),
        "max": float(np.max(arr)),
        "mean": float(np.mean(arr)),
        "median": float(np.median(arr)),
        "p25": float(np.percentile(arr, 25)),
        "p50": float(np.percentile(arr, 50)),
        "p75": float(np.percentile(arr, 75)),
        "p90": float(np.percentile(arr, 90)),
        "p95": float(np.percentile(arr, 95)),
        "p99": float(np.percentile(arr, 99)),
    }

def print_stats(title, arr):
    st = describe_points(arr)
    print(f"\n=== {title} ===")
    for k, v in st.items():
        print(f"  {k:6s}: {v:.3f}")
    return st

# GLOBAL
stats_all = print_stats("GLOBAL (ALL frames)", df_all["N_inlier"].values)
stats_val = print_stats("GLOBAL (VALID frames only)", df_valid["N_inlier"].values)

# PER SUBJECT (VALID only)
print("\n=== PER SUBJECT (VALID frames only) ===")
per_subject_stats = []
for s in SUBJECTS:
    sub = df_valid[df_valid["subject"] == s]
    if len(sub) == 0:
        print(f"Subject {s}: NO VALID data")
        continue
    st = describe_points(sub["N_inlier"].values)
    per_subject_stats.append({"subject": s, **st, "n_frames": len(sub)})
    print(f"\nSubject {s} (VALID): n={len(sub)}")
    for k, v in st.items():
        print(f"  {k:6s}: {v:.3f}")

df_per_subject_stats = pd.DataFrame(per_subject_stats)
df_per_subject_stats.head()


Cell 7 — Auto-generate candidate N_target (data-driven)

In [None]:
# Candidate generator around p50/p75/p90 of VALID frames
p50 = int(round(stats_val["p50"]))
p75 = int(round(stats_val["p75"]))
p90 = int(round(stats_val["p90"]))

# Make a small grid around those percentiles
def around(x, steps=( -4, 0, 4 )):
    out = []
    for d in steps:
        v = x + d
        if v > 0:
            out.append(v)
    return out

N_candidates = [32, 40, 48, 56, 64, 72, 80, 88, 96, 104, 112]


print("Auto N_candidates (from VALID p50/p75/p90):", N_candidates)


Cell 8 — Simulate sampling pressure for each N_target (VALID only)

In [None]:
def simulate_sampling_pressure(df_valid: pd.DataFrame, n_targets):
    """
    df_valid must already be VALID frames only and have column N_inlier (int).
    For each N_target:
      - fill: frames needing upsampling (N_inlier < N_target)
      - down: frames needing downsampling (N_inlier > N_target)
      - equal
    Also compute deficit/excess statistics for practical "pressure" evaluation.
    """
    records = []
    M = df_valid["N_inlier"].values.astype(int)
    total = len(M)

    for N in n_targets:
        need_fill = M < N
        need_down = M > N
        equal     = M == N

        n_fill = int(need_fill.sum())
        n_down = int(need_down.sum())
        n_eq   = int(equal.sum())

        # deficits/excess
        deficits = (N - M[need_fill]) if n_fill > 0 else np.array([])
        excess   = (M[need_down] - N) if n_down > 0 else np.array([])

        rec = {
            "N_target": int(N),
            "total_frames_used": int(total),
            "n_need_fill": n_fill,
            "n_equal": n_eq,
            "n_need_down": n_down,
            "pct_need_fill": 100.0 * n_fill / total if total else 0.0,
            "pct_equal": 100.0 * n_eq / total if total else 0.0,
            "pct_need_down": 100.0 * n_down / total if total else 0.0,
            "avg_deficit": float(deficits.mean()) if deficits.size else 0.0,
            "p95_deficit": float(np.percentile(deficits, 95)) if deficits.size else 0.0,
            "avg_excess": float(excess.mean()) if excess.size else 0.0,
            "p95_excess": float(np.percentile(excess, 95)) if excess.size else 0.0,
        }
        records.append(rec)

    return pd.DataFrame(records).sort_values("N_target").reset_index(drop=True)

df_pressure = simulate_sampling_pressure(df_valid, N_candidates)
display(df_pressure)


Cell 9 — (Optional) Compare pressure on ALL frames vs VALID frames

In [None]:
# WARNING: ALL frames includes invalid, but still uses N_inlier column
# If your invalid frames have N_inlier=0 (or NaN), statistics will shift.
df_all_validlike = df_all.copy()  # could include invalid
df_all_validlike = df_all_validlike.dropna(subset=["N_inlier"])
df_all_validlike["N_inlier"] = df_all_validlike["N_inlier"].astype(int)

# "valid_only=False" effect: pressure may look very different
df_pressure_all = simulate_sampling_pressure(df_all_validlike, N_candidates)

print("=== Pressure (VALID frames only) ===")
display(df_pressure)

print("=== Pressure (ALL frames, incl invalid) ===")
display(df_pressure_all)


Cell 10 — Quick plots (histogram)

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.hist(df_valid["N_inlier"].values, bins=60)
plt.title("Distribution of N_inlier (VALID frames only)")
plt.xlabel("N_inlier")
plt.ylabel("count")
plt.show()


Cell 11 — Save outputs for audit trail

In [None]:
OUT_DIR = Path(r"E:\0.TA_Teguh\Clustering GMM") / "_analysis_n_target"
OUT_DIR.mkdir(parents=True, exist_ok=True)

df_per_subject_stats.to_csv(OUT_DIR / "per_subject_stats_valid.csv", index=False)
df_pressure.to_csv(OUT_DIR / "sampling_pressure_valid.csv", index=False)

print("Saved to:", OUT_DIR)
