
# `validate_npz_enhanced.md`

## 📌 Overview

`validate_npz_enhanced.py` is a **data quality auditing tool** for preprocessed `.npz` keypoint files.
It validates whether each file is suitable for downstream training with **Transformer** and **IV3-GRU** models, generates structured reports, and (optionally) creates keypoint animations for visual QA.

This script is designed for use in the `flsr-transformer-vs-iv3gru/` project, but the checks are generic enough for other pose/keypoint datasets.

---

## ✅ What it does

* Loads `.npz` files containing keypoints, masks, timestamps, and optional `X2048` features.
* Runs a suite of **sanity checks**:

  * shape, dtype, normalization range
  * missing/invalid values (NaN/Inf)
  * temporal consistency across `X`, `mask`, and `timestamps`
  * velocity spikes and bone length jitter
  * keypoint coverage and visibility per frame
* Verifies compatibility with:

  * **Transformer model**: expects `[T,156]` float32 keypoints + `[T,78]` bool mask
  * **IV3-GRU model**: expects `[T,2048]` float32 features, temporally aligned with `X`
* Produces structured outputs:

  * `npz_audit_report.csv` → compact tabular summary for all files
  * `npz_audit_report.jsonl` → detailed JSON-per-file for debugging
  * `npz_audit_summary.md` → human-readable overview with issue counts
* (Optional) Creates MP4 animations of keypoints for quick QA.
  *(can be disabled with `make_videos=False` or `--no-video`)*

---

## 🗂 Expected Directory Layout

The script assumes the following structure:

```
flsr-transformer-vs-iv3gru/
│
├── data/
│   ├── processed/
│   │   └── keypoints_all/        # input .npz files (2130 clips)
│   ├── raw/                      # raw data (not used here)
│   └── reports/                  # outputs written here
│       ├── npz_audit_report.csv
│       ├── npz_audit_report.jsonl
│       ├── npz_audit_summary.md
│       └── (optional sample_animations/ if enabled)
│
├── notebooks/
│   └── validate_npz_enhanced.ipynb  # Jupyter notebook version
│
├── preprocessing/
│   └── validate_npz.py              # legacy checker (optional import)
│
└── models/ ...
```

---

## ⚙️ How it works

1. **Select inputs**

   * Single file:

     ```bash
     python validate_npz_enhanced.py --file data/processed/keypoints_all/15.npz
     ```
   * Entire directory:

     ```bash
     python validate_npz_enhanced.py --dir data/processed/keypoints_all --out data/reports
     ```

2. **Run validations**

   * Script loads each `.npz` and applies `basic_sanity`, `transformer_ready`, `iv3_gru_ready`.
   * Collects per-file metrics: coverage %, velocity spikes, bone jitter, out-of-range values.

3. **Write outputs**

   * Streaming writes to `npz_audit_report.jsonl` and `npz_audit_report.csv` while iterating.
   * At the end, compiles issue counts into `npz_audit_summary.md`.

4. **(Optional) Animations**

   * If enabled, saves keypoint visualizations under `data/reports/sample_animations/`.
   * Skippable for large datasets to save space.

---

## 🚀 Usage Options

* Skip animations:

  ```bash
  python validate_npz_enhanced.py --dir data/processed/keypoints_all --out data/reports --no-video
  ```
* Notebook workflow: open `notebooks/validate_npz_enhanced.ipynb` and run all cells.
* Reports are ready for sharing via Git; logs are optional and can be ignored.

---

## 📄 Outputs

* **CSV**: quick-glance metrics across all files
* **JSONL**: detailed raw audit records
* **MD summary**: counts of issues (ready to share with team)
* *(optional)* per-file animations

---

## ⚠️ Common Issues & Fixes

When reviewing the CSV/MD reports, here’s how to interpret the most common flags:

| Field / Issue                  | What it means                                                                 | Fix / Next Step                                                                  |
| ------------------------------ | ----------------------------------------------------------------------------- | -------------------------------------------------------------------------------- |
| `X_out_of_range > 0`           | Some keypoint coordinates fall outside `[0,1]`. Data not normalized properly. | Re-check preprocessing step that scales coordinates.                             |
| `X_has_nan` / `X_has_inf`      | Non-finite values in keypoints.                                               | Identify affected clips, fix preprocessing, or drop those samples.               |
| `timestamps_monotonic = False` | Frame timestamps go backward / are inconsistent.                              | Check data loader or export step that writes `timestamps_ms`.                    |
| `temporal_consistent = False`  | `X`, `mask`, and `timestamps` don’t all have the same frame count.            | Ensure preprocessing writes aligned arrays per clip.                             |
| `low_vis_frames > 0`           | Some frames have <10 visible keypoints (likely detector dropout).             | Consider filtering frames or interpolating missing detections.                   |
| `vel_spike_frames > 0`         | Sudden pose “teleports” between frames.                                       | Re-check tracking or smooth post-process with filtering.                         |
| `bone_len_cv_pct` high (>15%)  | Bone lengths vary too much → possible identity swaps or jitter.               | Apply skeleton smoothing or inspect detector/tracker quality.                    |
| Transformer issues             | Shape/dtype mismatch or missing mask/timestamps.                              | Adjust preprocessing pipeline to meet `[T,156]` float32 + `[T,78]` bool spec.    |
| IV3-GRU issues                 | Missing/invalid `X2048` features.                                             | Ensure InceptionV3 features were extracted; re-run feature extraction if needed. |


In [None]:
# --- Cell 1: Setup & Imports ---
from __future__ import annotations
from pathlib import Path
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os, sys, json, logging
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FFMpegWriter, FuncAnimation

# Make repo root importable (assuming this notebook lives in /notebooks)
ROOT = Path.cwd().parent
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

# Try optional validator from restructured preprocessing module (safe if missing)
try:
    from preprocessing.utils.validate_npz import validate_npz_file  # may be None if not present
except Exception:
    validate_npz_file = None

print("Project ROOT:", ROOT)


In [None]:
# --- Cell 2: Config, Topology, Logging Helpers (no file logs) ---

# 78 keypoints (25 body, 21 LH, 21 RH, 11 face) => X shape [T, 156], mask [T, 78]
SPLITS = {
    "pose": list(range(0, 25)),
    "left_hand": list(range(25, 46)),
    "right_hand": list(range(46, 67)),
    "face": list(range(67, 78)),
}

# Lightweight bones for stability checks (indices must exist)
BONES = [
    (5,7), (7,9), (6,8), (8,10),
    (11,13), (13,15), (12,14), (14,16),
    (5,6), (5,11), (6,12), (11,12),
]

DEFAULTS = dict(
    expect_x_dim=156,
    expect_mask_dim=78,
    transformer_dtype=np.float32,
    iv3_dim=2048,
    iv3_dtype=np.float32,
    x_norm_range=(0.0, 1.0),
    low_vis_threshold=10,       # frames with <10 visible kpts flagged
    vel_spike_sigma=3.0,        # spike if > median + 3*std
    bone_cv_threshold_2d=0.15,  # CV > 15% => jitter / identity issues
)

# --- Logging helpers: console only, no log files ---
def setup_logger(log_dir: Path, name="npz_audit") -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    sh = logging.StreamHandler(sys.stdout)
    fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
    sh.setFormatter(fmt)
    logger.addHandler(sh)
    return logger

def perfile_logger(base_dir: Path, stem: str) -> logging.Logger:
    # simplified: return the root logger so we don’t create per-file logs
    return logging.getLogger("npz_audit")


In [None]:
# --- Cell 3: Core QA Utilities ---

def is_monotonic_nondec(arr: np.ndarray) -> bool:
    return arr.size < 2 or np.all(arr[1:] >= arr[:-1])

def coverage_stats(mask: np.ndarray) -> Dict[str, Any]:
    if mask is None or mask.size == 0:
        return dict(has_mask=False, coverage_pct=0.0, min_visible=0, max_visible=0, low_vis_frames=0)
    vis_counts = mask.sum(axis=1)
    return dict(
        has_mask=True,
        coverage_pct=float(mask.mean() * 100.0),
        min_visible=int(vis_counts.min()),
        max_visible=int(vis_counts.max()),
        low_vis_frames=int((vis_counts < DEFAULTS["low_vis_threshold"]).sum()),
    )

def velocity_metrics(X: np.ndarray) -> Dict[str, Any]:
    if X.shape[0] < 2:
        return dict(vel_p95=0.0, vel_max=0.0, vel_spike_frames=0)
    xy = X.reshape(X.shape[0], -1, 2)
    v = np.linalg.norm(np.diff(xy, axis=0), axis=2)  # (T-1, J)
    p95 = float(np.nanpercentile(v, 95))
    vmax = float(np.nanmax(v))
    frame_max = np.nanmax(v, axis=1)
    med, std = float(np.nanmedian(frame_max)), float(np.nanstd(frame_max))
    spikes = int(np.sum(frame_max > (med + DEFAULTS["vel_spike_sigma"] * std)))
    return dict(vel_p95=p95, vel_max=vmax, vel_spike_frames=spikes)

def bone_length_cv(X: np.ndarray) -> Dict[str, Any]:
    T = X.shape[0]
    if T == 0 or not BONES:
        return dict(bone_len_cv_pct=0.0, bone_outlier_frames=0)
    xy = X.reshape(T, -1, 2)
    BL = []
    for a, b in BONES:
        if max(a, b) >= xy.shape[1]:
            continue
        BL.append(np.linalg.norm(xy[:, a, :] - xy[:, b, :], axis=1))
    if not BL:
        return dict(bone_len_cv_pct=0.0, bone_outlier_frames=0)
    BL = np.stack(BL, axis=1)  # (T, B)
    cv = float(np.nanstd(BL) / (np.nanmean(BL) + 1e-9))
    z = (BL - np.nanmean(BL, axis=0)) / (np.nanstd(BL, axis=0) + 1e-9)
    outlier_frames = int(np.any(np.abs(z) > 3.5, axis=1).sum())
    return dict(bone_len_cv_pct=cv * 100.0, bone_outlier_frames=outlier_frames)

def basic_sanity(X: np.ndarray, mask: np.ndarray, timestamps: np.ndarray) -> Dict[str, Any]:
    report: Dict[str, Any] = {}
    # shapes
    report["T"] = int(X.shape[0])
    report["X_shape"] = list(X.shape)
    report["mask_shape"] = list(mask.shape) if mask is not None else None
    report["ts_shape"] = list(timestamps.shape) if timestamps is not None else None
    # dtypes
    report["X_dtype"] = str(X.dtype)
    report["mask_dtype"] = str(mask.dtype) if mask is not None else None
    report["ts_dtype"] = str(timestamps.dtype) if timestamps is not None else None
    # finiteness
    report["X_has_nan"] = bool(np.isnan(X).any())
    report["X_has_inf"] = bool(np.isinf(X).any())
    # range (normalized)
    lo, hi = DEFAULTS["x_norm_range"]
    report["X_out_of_range"] = int(((X < lo) | (X > hi)).sum())
    # timestamps
    if timestamps is not None and timestamps.size > 1:
        report["timestamps_monotonic"] = bool(is_monotonic_nondec(timestamps))
        report["duration_ms"] = int(timestamps[-1] - timestamps[0])
    else:
        report["timestamps_monotonic"] = True
        report["duration_ms"] = 0
    # temporal consistency
    t_ok = [X.shape[0]]
    if mask is not None: t_ok.append(mask.shape[0])
    if timestamps is not None: t_ok.append(timestamps.shape[0])
    report["temporal_consistent"] = (len(set(t_ok)) == 1)
    # mask coverage + kinematics
    report.update(coverage_stats(mask))
    report.update(velocity_metrics(X))
    report.update(bone_length_cv(X))
    return report


In [None]:
# --- Cell 4: Model Validators ---

def transformer_ready(X: np.ndarray, mask: np.ndarray, timestamps: np.ndarray) -> Tuple[bool, List[str]]:
    issues = []
    if not (X.ndim == 2 and X.shape[1] == DEFAULTS["expect_x_dim"] and X.dtype == DEFAULTS["transformer_dtype"]):
        issues.append("X shape/dtype incorrect (expected [T,156] float32)")
    if mask is None or not (mask.ndim == 2 and mask.shape[1] == DEFAULTS["expect_mask_dim"] and mask.dtype == np.bool_):
        issues.append("mask shape/dtype incorrect (expected [T,78] bool)")
    if timestamps is None or not (timestamps.ndim == 1 and timestamps.dtype == np.int64):
        issues.append("timestamps incorrect (expected [T] int64)")
    if X.shape[0] != (mask.shape[0] if mask is not None else -1) or X.shape[0] != (timestamps.shape[0] if timestamps is not None else -1):
        issues.append("temporal dimension mismatch among X/mask/timestamps")
    if np.isnan(X).any() or np.isinf(X).any():
        issues.append("non-finite values in X")
    lo, hi = DEFAULTS["x_norm_range"]
    if ((X < lo) | (X > hi)).any():
        issues.append("X outside [0,1] range (expected normalized)")
    if timestamps is not None and timestamps.size > 1 and not is_monotonic_nondec(timestamps):
        issues.append("timestamps not monotonic")
    return (len(issues) == 0), issues

def iv3_gru_ready(X: np.ndarray, X2048: np.ndarray | None) -> Tuple[bool, List[str]]:
    issues = []
    if X2048 is None:
        return False, ["X2048 missing (required for IV3-GRU)"]
    if not (X2048.ndim == 2 and X2048.shape[1] == DEFAULTS["iv3_dim"] and X2048.dtype == DEFAULTS["iv3_dtype"]):
        issues.append("X2048 shape/dtype incorrect (expected [T,2048] float32)")
    if X2048.shape[0] != X.shape[0]:
        issues.append(f"X2048 temporal mismatch: X({X.shape[0]}) vs X2048({X2048.shape[0]})")
    if np.isnan(X2048).any() or np.isinf(X2048).any():
        issues.append("non-finite values in X2048")
    return (len(issues) == 0), issues


In [None]:
# --- Cell 5: Animation Helpers ---

def save_animation_mp4(
    X: np.ndarray,
    mask: np.ndarray,
    timestamps: np.ndarray,
    out_path: Path,
    split_map=SPLITS,
    fps=30,
    dpi=110,
    title_prefix="Keypoints",
    add_info=True,
):
    T = X.shape[0]
    xy = X.reshape(T, -1, 2)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, 1); ax.set_ylim(1, 0)
    ax.set_xlabel("X (normalized)"); ax.set_ylabel("Y (normalized)")
    ax.grid(True, alpha=0.3)

    scatters = {name: ax.scatter([], [], s=30, label=name) for name in split_map}
    inv_scatter = ax.scatter([], [], facecolors='none', edgecolors='k', s=60, label='invisible')
    ax.legend(loc='upper right')

    def update(i):
        xi, yi = xy[i, :, 0], xy[i, :, 1]
        vis = mask[i] if mask is not None and i < mask.shape[0] else np.ones(len(xi), dtype=bool)
        for name, idxs in split_map.items():
            pts = np.column_stack([xi[idxs], yi[idxs]]) if max(idxs) < len(xi) else np.empty((0, 2))
            scatters[name].set_offsets(pts)
        inv = np.where(~vis)[0]
        inv_pts = np.column_stack([xi[inv], yi[inv]]) if inv.size else np.empty((0, 2))
        inv_scatter.set_offsets(inv_pts)
        if add_info:
            label = [f"Frame {i+1}/{T}"]
            if timestamps is not None and timestamps.size > i:
                label.append(f"t={timestamps[i]/1000:.2f}s")
            ax.set_title(f"{title_prefix} | " + " | ".join(label))
        return list(scatters.values()) + [inv_scatter]

    ani = FuncAnimation(fig, update, frames=T, interval=1000//max(1, fps), blit=False)
    writer = FFMpegWriter(fps=fps, metadata=dict(artist="npz_audit"), bitrate=1800)
    ani.save(str(out_path), writer=writer, dpi=dpi)
    plt.close(fig)


In [None]:
# --- Cell 6: Single-File Audit ---

def audit_npz(npz_path: Path, out_root: Path, save_viz: bool = True) -> Dict[str, Any]:
    data = np.load(npz_path, allow_pickle=True)

    def get(name, default=None):
        return data[name] if name in data.files else default

    X = get("X")
    mask = get("mask")
    ts = get("timestamps_ms")
    X2048 = get("X2048", None)

    # Parse meta safely
    meta_raw = get("meta", None)
    meta = None
    if meta_raw is not None:
        try:
            if hasattr(meta_raw, "item"):
                meta_raw = meta_raw.item()
            if isinstance(meta_raw, bytes):
                meta_raw = meta_raw.decode("utf-8")
            meta = json.loads(meta_raw) if isinstance(meta_raw, str) else None
        except Exception:
            meta = {"_error": "bad_meta_json"}

    # Minimal required keys
    if X is None or mask is None or ts is None:
        return dict(file=str(npz_path), error="missing_required_keys", present=list(data.files))

    # Basic sanity
    sanity = basic_sanity(X, mask, ts)

    # Model readiness
    t_ok, t_issues = transformer_ready(X, mask, ts)
    i_ok, i_issues = iv3_gru_ready(X, X2048)

    # Optional: legacy comprehensive validator
    legacy = []
    if validate_npz_file is not None:
        try:
            legacy = validate_npz_file(
                str(npz_path),
                require_x2048=True,
                check_parquet=False,
                check_transformer=True,
                check_iv3=True,
            ) or []
        except Exception as e:
            legacy = [f"validate_npz_file_error:{e}"]

    # Animations
    # anim_dir = out_root / "sample_animations"
    # if save_viz:
    #    try:
    #        save_animation_mp4(X, mask, ts, anim_dir / f"{npz_path.stem}_preview.mp4",  fps=30, dpi=100, title_prefix="Preview",  add_info=False)
    #        save_animation_mp4(X, mask, ts, anim_dir / f"{npz_path.stem}_detailed.mp4", fps=15, dpi=150, title_prefix="Detailed", add_info=True)
    #       save_animation_mp4(X, mask, ts, anim_dir / f"{npz_path.stem}_slowmo.mp4",  fps=10, dpi=120, title_prefix="SlowMo",   add_info=True)
    #    except Exception as e:
    #        sanity["animation_error"] = str(e)

    rec: Dict[str, Any] = dict(
        file=str(npz_path),
        present=list(data.files),
        meta_summary=(list(meta.keys())[:8] if isinstance(meta, dict) else None),
        **sanity,
        transformer_ready=t_ok,
        transformer_issues=t_issues,
        iv3_ready=i_ok,
        iv3_issues=i_issues,
        legacy_issues=legacy,
    )

    if X2048 is not None and X2048.size:
        rec.update(
            dict(
                X2048_shape=list(X2048.shape),
                X2048_dtype=str(X2048.dtype),
                X2048_min=float(np.nanmin(X2048)),
                X2048_max=float(np.nanmax(X2048)),
                X2048_mean=float(np.nanmean(X2048)),
                X2048_std=float(np.nanstd(X2048)),
            )
        )
    return rec


In [None]:
# --- Cell 7: Batch Runner + Report Writers ---

def run_audit(npz_dir: Path | str | None,
              npz_file: Path | str | None,
              out_root: Path | str,
              make_videos: bool = True):
    # Normalize paths (robust to strings)
    if isinstance(npz_dir, (str, os.PathLike)) and npz_dir is not None:
        npz_dir = Path(npz_dir)
    if isinstance(npz_file, (str, os.PathLike)) and npz_file is not None:
        npz_file = Path(npz_file)
    if isinstance(out_root, (str, os.PathLike)):
        out_root = Path(out_root)

    logs_dir = out_root / "logs"
    reports_dir = out_root
    logger = setup_logger(logs_dir)
    records: List[Dict[str, Any]] = []

    # Select targets
    if npz_file:
        targets = [npz_file]
    elif npz_dir:
        if not npz_dir.exists():
            logger.error(f"Directory not found: {npz_dir}")
            return
        targets = sorted(npz_dir.glob("*.npz"))
    else:
        logger.error("Provide either npz_dir or npz_file")
        return

    if not targets:
        logger.warning(f"No .npz files found in {npz_dir if npz_dir else npz_file}")
        return

    logger.info(f"Found {len(targets)} target file(s)")

    for p in targets:
        flog = perfile_logger(logs_dir, p.stem)
        flog.info(f"Auditing {p.name}")
        try:
            rec = audit_npz(p, out_root, save_viz=make_videos)
            records.append(rec)

            verdict = []
            verdict.append("Transformer: OK" if rec["transformer_ready"] else f"Transformer: FAIL ({len(rec['transformer_issues'])} issues)")
            verdict.append("IV3-GRU: OK" if rec["iv3_ready"] else f"IV3-GRU: FAIL ({len(rec['iv3_issues'])} issues)")
            flog.info(" | ".join(verdict))

            flog.info(
                "coverage={:.1f}%  low_vis_frames={}  vel_p95={:.3f}  bone_cv%={:.1f}  X_out_of_range={}  X_has_nan={}  X_has_inf={}".format(
                    rec.get("coverage_pct", 0.0),
                    rec.get("low_vis_frames", 0),
                    rec.get("vel_p95", 0.0),
                    rec.get("bone_len_cv_pct", 0.0),
                    rec.get("X_out_of_range", 0),
                    rec.get("X_has_nan"),
                    rec.get("X_has_inf"),
                )
            )
        except Exception as e:
            flog.exception(f"Failed {p.name}: {e}")
            records.append(dict(file=str(p), error=str(e)))

    # Write JSONL and CSV
    reports_dir.mkdir(parents=True, exist_ok=True)
    jsonl = reports_dir / "npz_audit_report.jsonl"
    csv   = reports_dir / "npz_audit_report.csv"

    with open(jsonl, "w", encoding="utf-8") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

    import csv as _csv
    csv_fields = [
        "file","T","coverage_pct","low_vis_frames","vel_p95","vel_max",
        "vel_spike_frames","bone_len_cv_pct","bone_outlier_frames","X_out_of_range",
        "X_has_nan","X_has_inf","timestamps_monotonic","temporal_consistent",
        "transformer_ready","iv3_ready"
    ]
    with open(csv, "w", newline="", encoding="utf-8") as f:
        w = _csv.DictWriter(f, fieldnames=csv_fields)
        w.writeheader()
        for r in records:
            w.writerow({k: r.get(k) for k in csv_fields})

    # Markdown summary
    md = reports_dir / "npz_audit_summary.md"
    total = len(records)
    t_pass = sum(1 for r in records if r.get("transformer_ready"))
    i_pass = sum(1 for r in records if r.get("iv3_ready"))
    with open(md, "w", encoding="utf-8") as f:
        f.write(f"# NPZ Audit Summary ({datetime.now().isoformat(timespec='seconds')})\n\n")
        f.write(f"- Files audited: **{total}**\n")
        f.write(f"- Transformer ready: **{t_pass}/{total}**\n")
        f.write(f"- IV3-GRU ready: **{i_pass}/{total}**\n\n")
        f.write("## Top issues (counts)\n")
        issues = {}
        for r in records:
            for tag in r.get("transformer_issues", []) + r.get("iv3_issues", []) + r.get("legacy_issues", []):
                issues[tag] = issues.get(tag, 0) + 1
        for k, v in sorted(issues.items(), key=lambda kv: (-kv[1], kv[0]))[:20]:
            f.write(f"- {k}: {v}\n")

    logger.info(f"Reports written:\n- {jsonl}\n- {csv}\n- {md}\nLogs:\n- {logs_dir}")


In [None]:
# --- Cell 8: Run Audit (Notebook-Friendly Defaults) ---

# Input folder of .npz files
NPZ_DIR = ROOT / "data" / "processed" / "keypoints_all"
# Output folder for reports + logs + animations
OUT_DIR = ROOT / "data" / "reports"

print("NPZ_DIR:", NPZ_DIR)
print("OUT_DIR:", OUT_DIR)

# Run on a folder (set make_videos=False if ffmpeg is not available)
run_audit(npz_dir=NPZ_DIR, npz_file=None, out_root=OUT_DIR, make_videos=False)

# Example: run on a single file
# run_audit(npz_dir=None, npz_file=NPZ_DIR / "15.npz", out_root=OUT_DIR, make_videos=True)
