# üé¨ Viral Short Video Pipeline ‚Äî Production Grade v5.5 v5.5 (Kaggle)
> **Role:** AI Production Engineer & Video Pipeline Architect (editorial-grade)  
> **Hard Locks:** ‚úÖ 9:16 FIT + BLACK LETTERBOX ‚Ä¢ ‚úÖ 1080√ó1920 ‚Ä¢ ‚úÖ 30 FPS ‚Ä¢ ‚úÖ Audio 160k ‚Ä¢ ‚úÖ NO CROP/ZOOM ‚Ä¢ ‚úÖ NO FACE ‚Ä¢ ‚úÖ NO SUBTITLE/SRT

---

## üìå Table of Contents

- üß∞ **Stage 0: Config & Environment**
- üì• **Stage 1: Ingest Video**
- üîä **Stage 2: Audio Analysis (Global)**
- üñºÔ∏è **Stage 3: Visual Sampling (Thumbnails Only)**
- üéûÔ∏è **Stage 4: Shot Detection (Optional)**
- üö´ **Stage 5: No Face Processing (Hard Lock)**
- üß© **Stage 6: Segment Proposal (Candidate Mining)**
- üß™ **Stage 7: Feature Extraction**
- üó£Ô∏è **Stage 8: ASR (QUALITY Mode + Cache + Heartbeat)**
- üßÆ **Stage 9: Scoring (Audit-able, Percentile-Based)**
- üß≠ **Stage 10: Deterministic Selection + Diversity + Timeline Sanity**
- ‚úÇÔ∏è **Stage 11: Cut Rules (Snap to Word/Silence + Avoid Shot-Cut)**
- üì¶ **Stage 12: Export (9:16 FIT + Letterbox)**
- üßæ **Stage 13: Manifest (JSON/CSV + Caption/Hashtag)**
- ‚úÖ **Stage 14: Acceptance Tests & Summary**

---

## üìÇ Locked I/O Contract

### Input (LOCKED)
- Video MP4/MKV: `/kaggle/input/<dataset>/*`

### Output (LOCKED)
- `/kaggle/working/outputs/clips/*.mp4`  
- `/kaggle/working/outputs/thumbnails/*.jpg`  
- `/kaggle/working/manifest.json`  
- `/kaggle/working/manifest.csv`

### Run Folder (LOCKED)
```
/kaggle/working/
  outputs/
    clips/
    thumbnails/
  runs/
    run_<timestamp>/
      cache/
      logs/
      artifacts/
        audio.wav
        silence_segments.json
        speech_blocks.json
        transcript.json
        candidates.json
        selected.json
      manifest.json
      manifest.csv
```


In [1]:

# =========================
# Stage 0: Config & Env
# =========================

import os, sys, json, math, time, re, csv, subprocess, logging
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Dict, Any, Tuple, Optional

# ---------- Hard Locks ----------
LOCK_NO_FACE = True
LOCK_NO_CROP_ZOOM = True
LOCK_NO_SUBTITLES = True

# ---------- Export Contract ----------
EXPORT_W = 1080
EXPORT_H = 1920
EXPORT_FPS = 30
AUDIO_BITRATE = "160k"
EXPORT_MODE = "FIT_PAD_BLACK"  # FIT + BLACK LETTERBOX

# ---------- Clip Contract ----------
MIN_CLIP_SEC = 18.0
MAX_CLIP_SEC = 60.0
HOOK_WINDOW_SEC = 5.0

# ---------- Diversity / Selection ----------
MAX_FINAL_CLIPS = 6
MIN_GAP_SEC = 30.0
SEGMENT_DURATION_SEC = 600.0  # 10 minutes
MAX_PER_SEGMENT = 2

# ---------- ASR ----------
ASR_LANGUAGE = "id"
ASR_ENABLED = True
ASR_TOP_PERCENT = 0.40  # transcribe top 40% for scoring; selected-only ASR for snapping if needed
MAX_ASR_BLOCK_SEC = 28.0
MAX_ASR_BLOCK_WALL_SEC = 45.0
ASR_BLOCK_OVERLAP_SEC = 0.25

# ---------- Trigger words (markers) ----------
TRIGGER_WORDS = [
    "anjir","anjay","gila","serius","beneran","parah","lucuu","ngakak","ketawa",
    "kok","loh","hah","apaan","buset","astaga","waduh","wkwk","wkwkwk","yaampun",
    "kaget","plot","twist","tapi","ternyata","eh","coba","sumpah"
]

# ---------- Tool Binaries ----------
FFMPEG_BIN = "ffmpeg"
FFPROBE_BIN = "ffprobe"

# ---------- Run folder (timezone-aware UTC) ----------
RUN_TIMESTAMP = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
WORKDIR = Path("/kaggle/working")
RUN_DIR = WORKDIR / "runs" / f"run_{RUN_TIMESTAMP}"
CACHE_DIR = RUN_DIR / "cache"
LOG_DIR = RUN_DIR / "logs"
ART_DIR = RUN_DIR / "artifacts"

# ‚úÖ LOCKED: per-run outputs (user request)
OUT_DIR = RUN_DIR  # keep run artifacts self-contained
CLIPS_DIR = OUT_DIR / "clips"
THUMBS_DIR = OUT_DIR / "thumbnails"

# Optional convenience mirror (not required for correctness)
PUBLIC_OUT_DIR = WORKDIR / "outputs"
PUBLIC_CLIPS_DIR = PUBLIC_OUT_DIR / "clips"
PUBLIC_THUMBS_DIR = PUBLIC_OUT_DIR / "thumbnails"

for p in [CACHE_DIR, LOG_DIR, ART_DIR, CLIPS_DIR, THUMBS_DIR, PUBLIC_CLIPS_DIR, PUBLIC_THUMBS_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# ---------- Logging ----------
LOG_FILE = LOG_DIR / "pipeline.log"

logger = logging.getLogger("viralshort")
logger.setLevel(logging.INFO)
logger.handlers.clear()

fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")

fh = logging.FileHandler(LOG_FILE, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(fmt)

sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.INFO)
sh.setFormatter(fmt)

logger.addHandler(fh)
logger.addHandler(sh)

def log_flush():
    for h in logger.handlers:
        try: h.flush()
        except Exception: pass
    try: sys.stdout.flush()
    except Exception: pass

class StageTimer:
    def __init__(self, stage_id: int, name: str):
        self.stage_id = stage_id
        self.name = name
        self.t0 = None
        self.dt = None
    def __enter__(self):
        self.t0 = time.monotonic()
        logger.info(f"[STAGE {self.stage_id:02d}] START - {self.name}")
        log_flush()
        return self
    def __exit__(self, exc_type, exc, tb):
        self.dt = time.monotonic() - self.t0
        if exc:
            logger.error(f"[STAGE {self.stage_id:02d}] FAIL  - {self.name} ({self.dt:.2f}s): {exc}")
        else:
            logger.info(f"[STAGE {self.stage_id:02d}] END   - {self.name} ({self.dt:.2f}s)")
        log_flush()
        return False

def write_json(path: Path, obj: Any):
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)
    logger.info(f"Wrote: {path}")

def read_json(path: Path) -> Any:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

logger.info(f"RUN_DIR: {RUN_DIR}")
logger.info("Hard Locks: NO_FACE=%s NO_CROP_ZOOM=%s NO_SUBTITLES=%s", LOCK_NO_FACE, LOCK_NO_CROP_ZOOM, LOCK_NO_SUBTITLES)
log_flush()


2026-01-31 02:25:09,270 | INFO | RUN_DIR: \kaggle\working\runs\run_20260130_192509
2026-01-31 02:25:09,271 | INFO | Hard Locks: NO_FACE=True NO_CROP_ZOOM=True NO_SUBTITLES=True


## üì• Stage 1: Ingest Video

In [2]:

# =========================
# Stage 1: Ingest Video
# =========================
from glob import glob

with StageTimer(1, "Ingest Video"):
    input_root = Path("/kaggle/input")
    if not input_root.exists():
        raise RuntimeError("Missing /kaggle/input (this notebook is for Kaggle).")

    # pick first mp4/mkv found
    candidates = []
    for ext in ("*.mp4","*.mkv","*.mov","*.webm","*.m4v"):
        candidates += list(input_root.rglob(ext))
    if not candidates:
        raise RuntimeError("No video found under /kaggle/input/<dataset>/*")
    VIDEO_PATH = sorted(candidates)[0]
    logger.info(f"Selected video: {VIDEO_PATH}")

    # probe duration + audio presence
    cmd = [
        FFPROBE_BIN, "-v", "error", "-show_entries",
        "format=duration:stream=codec_type", "-of", "json", str(VIDEO_PATH)
    ]
    meta = json.loads(subprocess.check_output(cmd).decode("utf-8"))
    duration = float(meta["format"]["duration"])
    ANALYZED_DURATION = duration
    logger.info(f"Duration: {ANALYZED_DURATION:.2f}s")

    # store meta
    write_json(ART_DIR / "video_meta.json", {"video_path": str(VIDEO_PATH), "duration_sec": ANALYZED_DURATION, "run_timestamp_utc": RUN_TIMESTAMP})


2026-01-31 02:25:09,289 | INFO | [STAGE 01] START - Ingest Video
2026-01-31 02:25:09,292 | ERROR | [STAGE 01] FAIL  - Ingest Video (0.00s): Missing /kaggle/input (this notebook is for Kaggle).


RuntimeError: Missing /kaggle/input (this notebook is for Kaggle).

## üîä Stage 2: Audio Analysis (Global)

In [None]:
# =========================
# Stage 2: Audio Analysis (Global)
# =========================
import numpy as np
import re

with StageTimer(2, "Audio Extract + Global Analysis"):

    AUDIO_WAV = str(ART_DIR / "audio.wav")

    # Extract mono 16k wav for analysis/ASR
    cmd = [
        FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error",
        "-i", str(VIDEO_PATH),
        "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", AUDIO_WAV
    ]
    subprocess.run(cmd, check=True)
    logger.info(f"Audio extracted: {AUDIO_WAV}")

    # -------------------------
    # Silence detect (ffmpeg) ‚Äî cached
    # -------------------------
    sil_path = ART_DIR / "silence_segments.json"
    if sil_path.exists():
        SILENCE_SEGMENTS = read_json(sil_path)
        logger.info("Loaded cached silence_segments.json")
    else:
        silence_cmd = [
            FFMPEG_BIN, "-hide_banner", "-loglevel", "info",
            "-i", AUDIO_WAV,
            "-af", "silencedetect=noise=-35dB:d=0.35",
            "-f", "null", "-"
        ]
        p = subprocess.Popen(silence_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        _, err = p.communicate()

        if p.returncode != 0:
            logger.warning("silencedetect failed; continuing with empty silence list")
            SILENCE_SEGMENTS = []
        else:
            starts, ends = [], []
            for line in err.splitlines():
                if "silence_start" in line:
                    m = re.search(r"silence_start: ([0-9\.]+)", line)
                    if m:
                        starts.append(float(m.group(1)))
                if "silence_end" in line:
                    m = re.search(r"silence_end: ([0-9\.]+)", line)
                    if m:
                        ends.append(float(m.group(1)))

            SILENCE_SEGMENTS = []
            j = 0
            for s in starts:
                while j < len(ends) and ends[j] < s:
                    j += 1
                if j < len(ends):
                    SILENCE_SEGMENTS.append([s, ends[j]])
                    j += 1

        write_json(sil_path, SILENCE_SEGMENTS)
        logger.info("Wrote silence_segments.json")

    logger.info(f"Silence segments: {len(SILENCE_SEGMENTS)}")

    # -------------------------
    # Energy curve (RMS timeline) ‚Äî always written
    # -------------------------
    ENERGY_CURVE = []  # <-- WAJIB: global variable for Stage 6 fallback

    try:
        import soundfile as sf
        y, sr = sf.read(AUDIO_WAV)
    except Exception as e:
        logger.warning(f"soundfile read failed; falling back to scipy.io.wavfile ({e})")
        from scipy.io import wavfile
        sr, y = wavfile.read(AUDIO_WAV)

        # convert int PCM to float32
        if hasattr(y, "dtype") and y.dtype != np.float32:
            if np.issubdtype(y.dtype, np.integer):
                y = y.astype(np.float32) / float(np.iinfo(y.dtype).max)
            else:
                y = y.astype(np.float32)

    if y.ndim > 1:
        y = y.mean(axis=1)

    hop = int(0.02 * sr)  # 20ms
    win = int(0.04 * sr)  # 40ms

    for i in range(0, len(y) - win, hop):
        frame = y[i:i + win]
        val = float(np.sqrt(np.mean(frame * frame)) + 1e-12)
        t = float(i / sr)
        ENERGY_CURVE.append({"time": t, "rms": val})

    # write to artifacts for audit + Stage 6 load
    write_json(ART_DIR / "energy_curve.json", ENERGY_CURVE)
    logger.info(f"Energy points: {len(ENERGY_CURVE)} (saved to energy_curve.json)")


2026-01-30 12:04:23,520 | INFO | [STAGE 02] START - Audio Extract + Global Analysis
2026-01-30 12:04:34,253 | INFO | Audio extracted: /kaggle/working/runs/run_20260130_120422/artifacts/audio.wav
2026-01-30 12:04:35,249 | INFO | Wrote: /kaggle/working/runs/run_20260130_120422/artifacts/silence_segments.json
2026-01-30 12:04:35,250 | INFO | Wrote silence_segments.json
2026-01-30 12:04:35,250 | INFO | Silence segments: 435
2026-01-30 12:04:39,753 | INFO | Wrote: /kaggle/working/runs/run_20260130_120422/artifacts/energy_curve.json
2026-01-30 12:04:39,754 | INFO | Energy points: 232359 (saved to energy_curve.json)
2026-01-30 12:04:39,755 | INFO | [STAGE 02] END   - Audio Extract + Global Analysis (16.24s)


In [None]:
# =========================
# Stage 2.5: Speech Blocks (silence complement + VAD fallback)
# =========================
import numpy as np


def _speech_blocks_from_silence(total_dur, silence_segments, pad=0.05):
    sil = sorted([[max(0.0, s - pad), min(total_dur, e + pad)] for s, e in (silence_segments or [])], key=lambda x: x[0])
    out = []
    cur = 0.0
    for s, e in sil:
        if s > cur:
            out.append([cur, s])
        cur = max(cur, e)
    if cur < total_dur:
        out.append([cur, total_dur])
    # remove tiny blocks
    out = [[s, e] for s, e in out if (e - s) >= 0.6]
    return out


def _speech_blocks_from_energy(energy_curve, total_dur, thr_pct=60, min_len=0.8, max_gap=0.35):
    if not energy_curve:
        return []
    vals = [float(p.get("rms", 0.0)) for p in energy_curve]
    if not vals:
        return []
    thr = float(np.percentile(vals, thr_pct))
    blocks = []
    in_speech = False
    s = None
    last_t = None
    for p in energy_curve:
        t = float(p.get("time", 0.0))
        rms = float(p.get("rms", 0.0))
        if rms >= thr:
            if not in_speech:
                in_speech = True
                s = t
            last_t = t
        else:
            if in_speech and last_t is not None:
                if (t - last_t) > max_gap:
                    e = last_t
                    if (e - s) >= min_len:
                        blocks.append([s, min(e, total_dur)])
                    in_speech = False
                    s = None
                    last_t = None
    if in_speech and s is not None and last_t is not None:
        if (last_t - s) >= min_len:
            blocks.append([s, min(last_t, total_dur)])
    return blocks


with StageTimer(2, "Speech Blocks (silence complement + VAD fallback)"):
    total = float(ANALYZED_DURATION)
    speech_blocks = _speech_blocks_from_silence(total, SILENCE_SEGMENTS, pad=0.05)
    if not speech_blocks:
        logger.warning("No speech blocks from silence; using energy VAD fallback")
        speech_blocks = _speech_blocks_from_energy(ENERGY_CURVE, total)

    SPEECH_BLOCKS = speech_blocks
    write_json(ART_DIR / "speech_blocks.json", SPEECH_BLOCKS)
    logger.info(f"Speech blocks: {len(SPEECH_BLOCKS)}")

    if not SPEECH_BLOCKS:
        raise RuntimeError("SPEECH_BLOCKS missing/empty without fallback")


## üñºÔ∏è Stage 3: Visual Sampling (Thumbnails Only)

In [None]:

# =========================
# Stage 3: Visual Sampling (Thumbnails Only)
# =========================
import random

with StageTimer(3, "Thumbnails Sampling (No AI)"):
    # sample fixed timestamps for thumbnails (deterministic)
    random.seed(1337)
    n_thumbs = min(24, max(8, int(ANALYZED_DURATION // 30)))
    ts = sorted({min(ANALYZED_DURATION-0.1, (i+1)*ANALYZED_DURATION/(n_thumbs+1)) for i in range(n_thumbs)})
    thumb_paths = []
    for i, t in enumerate(ts, 1):
        outp = THUMBS_DIR / f"sample_{i:02d}_{t:.2f}.jpg"
        cmd = [
            FFMPEG_BIN, "-y", "-hide_banner", "-loglevel", "error",
            "-ss", str(t), "-i", str(VIDEO_PATH),
            "-vframes", "1", "-q:v", "2", str(outp)
        ]
        subprocess.run(cmd, check=True)
        thumb_paths.append(str(outp))
    write_json(ART_DIR / "thumbnail_samples.json", {"timestamps": ts, "paths": thumb_paths})
    logger.info(f"Sample thumbnails: {len(thumb_paths)}")


2026-01-30 12:04:39,766 | INFO | [STAGE 03] START - Thumbnails Sampling (No AI)
2026-01-30 12:04:50,340 | INFO | Wrote: /kaggle/working/runs/run_20260130_120422/artifacts/thumbnail_samples.json
2026-01-30 12:04:50,341 | INFO | Sample thumbnails: 24
2026-01-30 12:04:50,342 | INFO | [STAGE 03] END   - Thumbnails Sampling (No AI) (10.58s)


## üéûÔ∏è Stage 4: Shot Detection (Optional)

In [None]:

# =========================
# Stage 4: Shot Detection (Optional)
# =========================
with StageTimer(4, "Shot Detection (Optional)"):
    SHOT_CUTS = []
    shot_path = ART_DIR / "shot_cuts.json"
    if shot_path.exists():
        SHOT_CUTS = read_json(shot_path)
        logger.info("Loaded cached shot cuts")
    else:
        try:
            from scenedetect import open_video, SceneManager
            from scenedetect.detectors import ContentDetector

            video = open_video(str(VIDEO_PATH))
            scene_manager = SceneManager()
            scene_manager.add_detector(ContentDetector(threshold=27.0))
            scene_manager.detect_scenes(video, show_progress=False)
            scene_list = scene_manager.get_scene_list()
            # convert to cut timestamps (scene start times excluding 0)
            for (start, end) in scene_list[1:]:
                SHOT_CUTS.append(float(start.get_seconds()))
            write_json(shot_path, SHOT_CUTS)
            logger.info(f"Shot cuts: {len(SHOT_CUTS)}")
        except Exception as e:
            logger.warning(f"Shot detection unavailable; continuing without. ({e})")
            SHOT_CUTS = []


2026-01-30 12:04:50,351 | INFO | [STAGE 04] START - Shot Detection (Optional)
2026-01-30 12:04:50,356 | INFO | [STAGE 04] END   - Shot Detection (Optional) (0.00s)


## üö´ Stage 5: No Face Processing (Hard Lock)

In [None]:

# =========================
# Stage 5: No Face Processing (Hard Lock)
# =========================
with StageTimer(5, "Hard Lock: NO FACE / NO CROP / NO ZOOM"):
    # Audit guard: ensure notebook never imports face libs or uses crop/zoom filters intentionally.
    banned_tokens = [
        "face_recognition", "mediapipe", "dlib", "mtcnn", "retinaface",
        "insightface", "face", "facenet", "tracking", "reframe", "zoompan",
        "crop=", "scale=ih*9/16", "cropdetect"
    ]
    # This is a runtime sanity message, not file scanning.
    logger.info("‚úÖ This pipeline does not perform face detection/tracking/crop/zoom. Export uses FIT+PAD only.")
    logger.info("‚úÖ This pipeline does not generate SRT/subtitles or burn-in captions.")


2026-01-30 12:04:50,383 | INFO | [STAGE 05] START - Hard Lock: NO FACE / NO CROP / NO ZOOM
2026-01-30 12:04:50,386 | INFO | ‚úÖ This pipeline does not perform face detection/tracking/crop/zoom. Export uses FIT+PAD only.
2026-01-30 12:04:50,387 | INFO | ‚úÖ This pipeline does not generate SRT/subtitles or burn-in captions.
2026-01-30 12:04:50,388 | INFO | [STAGE 05] END   - Hard Lock: NO FACE / NO CROP / NO ZOOM (0.00s)


## üß© Stage 6: Segment Proposal (Candidate Mining)

In [None]:
# =========================
# Stage 6: Segment Proposal (Candidate Mining) ‚Äî FINAL (Flexible Duration End-Seeking)
# =========================
import numpy as np

def complement_intervals(total_end: float, silence_segments: List[List[float]], pad: float=0.0) -> List[List[float]]:
    sil = sorted([[max(0.0, s-pad), min(total_end, e+pad)] for s, e in (silence_segments or [])], key=lambda x: x[0])
    out = []
    cur = 0.0
    for s, e in sil:
        if s > cur:
            out.append([cur, s])
        cur = max(cur, e)
    if cur < total_end:
        out.append([cur, total_end])
    return out

def get_peak_time(energy_curve, start, end):
    pts = [p for p in energy_curve if start <= p["time"] <= end]
    if not pts:
        return None, None
    m = max(pts, key=lambda x: x["rms"])
    return float(m["time"]), float(m["rms"])

# -------------------------------------------------
# FLEXIBLE DURATION (OPSIONAL) ‚Äî end seeking
# -------------------------------------------------
USE_FLEXIBLE_DUR = os.getenv("USE_FLEXIBLE_DUR", "1") == "1"

MIN_DUR_SEC = float(os.getenv("MIN_DUR_SEC", str(MIN_CLIP_SEC)))
MAX_DUR_SEC = float(os.getenv("MAX_DUR_SEC", str(MAX_CLIP_SEC)))

# Ideal range hanya preferensi (bukan wajib)
IDEAL_DUR_MIN = float(os.getenv("IDEAL_DUR_MIN", "25"))
IDEAL_DUR_MAX = float(os.getenv("IDEAL_DUR_MAX", "45"))

# Window cari endpoint natural dekat target/hardcap
END_LOOKBACK  = float(os.getenv("END_LOOKBACK", "3.0"))
END_LOOKAHEAD = float(os.getenv("END_LOOKAHEAD", "2.0"))

def _best_silence_start_in_window(lo: float, hi: float, target: float, silence_segments: List[List[float]]):
    best = None
    best_d = None
    for s, e in (silence_segments or []):
        s = float(s)
        if lo <= s <= hi:
            d = abs(s - target)
            if best_d is None or d < best_d:
                best_d = d
                best = s
    return best

def _best_block_end_in_window(lo: float, hi: float, target: float, speech_blocks: List[List[float]]):
    best = None
    best_d = None
    for s, e in (speech_blocks or []):
        e = float(e)
        if lo <= e <= hi:
            d = abs(e - target)
            if best_d is None or d < best_d:
                best_d = d
                best = e
    return best

def pick_flexible_end(start: float, hard_end: float,
                      silence_segments: List[List[float]],
                      speech_blocks: List[List[float]]):
    """
    Pilih end natural (durasi opsional):
    - Prioritas: silence start, lalu speech-block end
    - Ideal range hanya preferensi
    - Kalau tidak ada finish point, fallback hard_end
    """
    start = float(start)
    hard_end = float(hard_end)

    min_end = min(start + MIN_DUR_SEC, hard_end)
    if hard_end <= min_end + 0.05:
        return hard_end, "end:video_short"

    # target preferensi: tengah ideal range, tapi tidak melewati hard_end
    target = min(start + (IDEAL_DUR_MIN + IDEAL_DUR_MAX) * 0.5, hard_end)

    # window pencarian
    win_lo = max(min_end, target - END_LOOKBACK)
    win_hi = min(hard_end, target + END_LOOKAHEAD)

    sil = _best_silence_start_in_window(win_lo, win_hi, target, silence_segments)
    if sil is not None:
        return float(max(sil, min_end)), "end:silence_seek"

    blk = _best_block_end_in_window(win_lo, win_hi, target, speech_blocks)
    if blk is not None:
        return float(max(blk, min_end)), "end:block_seek"

    # fallback hard cap (terakhir)
    return hard_end, "end:hardcap"

# =====================================================
# RMS PEAK THRESHOLD SAFE-INIT (ANTI NameError)
# =====================================================
def _compute_rms_percentile(energy_curve, q=97):
    if not energy_curve:
        return None
    vals = []
    for p in energy_curve:
        if isinstance(p, dict) and "rms" in p:
            try:
                vals.append(float(p["rms"]))
            except Exception:
                pass
    if not vals:
        return None
    try:
        return float(np.percentile(vals, q))
    except Exception:
        return None

# Resolve P97 safely (fallback jika P97 belum ada)
if "P97" in globals():
    try:
        P97_VAL = float(P97)
    except Exception:
        P97_VAL = _compute_rms_percentile(ENERGY_CURVE, q=97)
else:
    P97_VAL = _compute_rms_percentile(ENERGY_CURVE, q=97)

ENABLE_PEAK_WINDOWS = (P97_VAL is not None)

with StageTimer(6, "Segment Proposal (Candidate Mining)"):

    CANDIDATES = []
    cid = 0

    # =====================================================
    # A) SPEECH-BLOCK WINDOWS (sliding)
    # =====================================================
    speech_blocks = SPEECH_BLOCKS if "SPEECH_BLOCKS" in globals() else []
    for bs, be in speech_blocks:
        bs = float(bs); be = float(be)
        dur = be - bs
        if dur < MIN_CLIP_SEC:
            continue

        # window base: clamp ke MAX, tapi durasi end bisa fleksibel
        win = min(MAX_DUR_SEC, max(MIN_DUR_SEC, dur))
        step = 6.0
        t = bs
        while t + MIN_CLIP_SEC <= be:
            hard_end = min(be, t + win)

            if USE_FLEXIBLE_DUR:
                end, end_reason = pick_flexible_end(
                    start=t,
                    hard_end=hard_end,
                    silence_segments=SILENCE_SEGMENTS if "SILENCE_SEGMENTS" in globals() else [],
                    speech_blocks=speech_blocks
                )
            else:
                end, end_reason = hard_end, "end:fixed"

            if (end - t) >= MIN_CLIP_SEC:
                cid += 1
                CANDIDATES.append({
                    "id": f"cand_{cid:04d}",
                    "type": "speech_block",
                    "start": float(t),
                    "end": float(end),
                    "duration": float(end - t),
                    "end_reason": end_reason,   # audit-friendly (non-core)
                })
            t += step

    # =====================================================
    # B) PEAK-CENTRIC WINDOWS (UPGRADE)
    # =====================================================
    if not ENABLE_PEAK_WINDOWS:
        logger.warning("[STAGE 06] Peak detection disabled (P97 unavailable) ‚Äî skipping peak-centric mining")
    else:
        peaks = [p for p in ENERGY_CURVE if float(p["rms"]) >= float(P97_VAL)]
        peaks = peaks[::max(1, len(peaks) // 200)] if peaks else []

        for p in peaks:
            peak_time = float(p["time"])
            ideal = 40.0
            setup = 6.0
            target_peak_offset = 15.0
            min_peak_offset = 8.0
            max_peak_offset = 25.0

            wstart = peak_time - target_peak_offset
            wend_hard = wstart + ideal

            # clamp bounds
            wstart = max(0.0, float(wstart))
            wend_hard = min(float(ANALYZED_DURATION), float(wend_hard))

            if wend_hard - wstart < MIN_CLIP_SEC:
                continue

            peak_abs, _ = get_peak_time(ENERGY_CURVE, wstart, wend_hard)
            if peak_abs is None:
                continue

            peak_offset = peak_abs - wstart
            if not (min_peak_offset <= peak_offset <= max_peak_offset):
                continue
            if peak_offset < (setup + 1.0):
                continue

            # flexible end seeking for peak windows too
            hard_end = min(wstart + MAX_DUR_SEC, wend_hard)
            if USE_FLEXIBLE_DUR:
                wend, end_reason = pick_flexible_end(
                    start=wstart,
                    hard_end=hard_end,
                    silence_segments=SILENCE_SEGMENTS if "SILENCE_SEGMENTS" in globals() else [],
                    speech_blocks=speech_blocks
                )
            else:
                wend, end_reason = hard_end, "end:fixed"

            if wend - wstart < MIN_CLIP_SEC:
                continue

            cid += 1
            CANDIDATES.append({
                "id": f"cand_{cid:04d}",
                "type": "peak",
                "start": float(wstart),
                "end": float(wend),
                "duration": float(wend - wstart),
                "peak_time_abs": float(peak_abs),
                "peak_offset_in_clip": float(peak_offset),
                "end_reason": end_reason,  # audit-friendly (non-core)
            })

    # =====================================================
    # FINAL CLAMP & SAVE
    # =====================================================
    final = []
    for c in CANDIDATES:
        d = float(c["end"] - c["start"])
        if d < MIN_CLIP_SEC:
            continue
        if d > MAX_DUR_SEC:
            c["end"] = float(c["start"] + MAX_DUR_SEC)
            c["duration"] = float(c["end"] - c["start"])
            c["end_reason"] = (c.get("end_reason","") + "|clamp:max").strip("|")
        final.append(c)

    CANDIDATES = sorted(final, key=lambda x: (x["start"], x["end"], x["id"]))
    write_json(ART_DIR / "candidates.json", CANDIDATES)

    logger.info(f"Candidates: {len(CANDIDATES)}")
    log_flush()


2026-01-30 12:04:50,422 | INFO | [STAGE 06] START - Segment Proposal (Candidate Mining)
2026-01-30 12:04:50,425 | ERROR | [STAGE 06] FAIL  - Segment Proposal (Candidate Mining) (0.00s): name 'P97' is not defined


NameError: name 'P97' is not defined

## üß™ Stage 7: Feature Extraction

In [None]:

# =========================
# Stage 7: Feature Extraction
# =========================
import numpy as np
def percentile_value(sorted_vals, q):
    if not sorted_vals:
        return 0.0
    if q <= 0:
        return float(sorted_vals[0])
    if q >= 1:
        return float(sorted_vals[-1])
    idx = int(round(q * (len(sorted_vals) - 1)))
    return float(sorted_vals[max(0, min(len(sorted_vals)-1, idx))])

def window_points(energy_curve, start, end):
    return [e for e in energy_curve if start <= e["time"] <= end]

def clamp01(x): 
    return max(0.0, min(1.0, float(x)))

def compute_features(candidate: Dict[str,Any]) -> Dict[str,Any]:
    s = candidate["start"]
    e = candidate["end"]
    dur = e - s
    pts = window_points(ENERGY_CURVE, s, e)
    if not pts:
        pts = [{"time": s, "rms": 0.0}]
    rms = np.array([p["rms"] for p in pts], dtype=np.float32)
    mean = float(rms.mean())
    peak = float(rms.max())
    std = float(rms.std())
    ptm = float(peak / (mean + 1e-9))

    # hook energy first 0‚Äì5s
    hook_pts = window_points(ENERGY_CURVE, s, min(e, s + HOOK_WINDOW_SEC))
    hook_rms = np.array([p["rms"] for p in hook_pts], dtype=np.float32) if hook_pts else np.array([0.0],dtype=np.float32)
    hook_energy = float(hook_rms.mean())

    # early/late energy (fallback arc)
    early_pts = window_points(ENERGY_CURVE, s, min(e, s + dur*0.25))
    late_pts  = window_points(ENERGY_CURVE, max(s, e - dur*0.25), e)
    early_energy = float(np.mean([p["rms"] for p in early_pts])) if early_pts else mean
    late_energy  = float(np.mean([p["rms"] for p in late_pts])) if late_pts else mean

    # peak time & offset
    peak_pt = max(pts, key=lambda x: x["rms"])
    peak_time_abs = float(peak_pt["time"])
    peak_offset_in_clip = float(peak_time_abs - s)

    # spike rate: local peaks above global P90
    all_rms_sorted = sorted([p["rms"] for p in ENERGY_CURVE] or [0.0])
    P90 = percentile_value(all_rms_sorted, 0.90)
    spikes = 0
    for i in range(1, len(pts)-1):
        a,b,c = pts[i-1], pts[i], pts[i+1]
        if b["rms"] >= P90 and b["rms"] >= a["rms"] and b["rms"] >= c["rms"]:
            spikes += 1
    spike_rate = float(spikes / (dur + 1e-6))

    # silence ratio within candidate
    sil = 0.0
    for (ss,se) in SILENCE_SEGMENTS:
        inter = max(0.0, min(e,se) - max(s,ss))
        sil += inter
    silence_ratio = float(sil / (dur + 1e-6))

    # distance to nearest shot cut
    if SHOT_CUTS:
        near_cut = float(min(abs(c - s) for c in SHOT_CUTS + [s]) if SHOT_CUTS else 9999.0)
        near_cut_end = float(min(abs(c - e) for c in SHOT_CUTS + [e]) if SHOT_CUTS else 9999.0)
        near_cut_dist = float(min(near_cut, near_cut_end))
    else:
        near_cut_dist = 9999.0

    return {
        "mean_energy": mean,
        "peak_energy": peak,
        "energy_stddev": std,
        "peak_to_mean": ptm,
        "hook_energy": hook_energy,
        "early_energy": early_energy,
        "late_energy": late_energy,
        "peak_time_abs": peak_time_abs,
        "peak_offset_in_clip": peak_offset_in_clip,
        "spike_rate": spike_rate,
        "silence_ratio": silence_ratio,
        "near_cut_dist": near_cut_dist,
        # ASR-dependent placeholders (filled later)
        "words_per_sec": 2.0,
        "trigger_count": 0,
        "markers_abs": [],
        # for scoring logic
        "start_abs_for_scoring": s,
        "end_abs_for_scoring": e,
    }

with StageTimer(7, "Feature Extraction"):
    for c in CANDIDATES:
        c["features"] = compute_features(c)
    logger.info("Features computed for all candidates.")



## üó£Ô∏è Stage 8: ASR (QUALITY Mode + Cache + Heartbeat)

In [None]:
# =========================
# Stage 8: ASR (Top-N per bucket + Cache + Fail-safe)
# =========================

TIME_BUCKETS = 5
ASR_TOPN_PER_BUCKET = int(os.getenv("ASR_TOPN_PER_BUCKET", "4"))
ASR_MODEL_NAME = os.getenv("ASR_MODEL_NAME", "tiny")
ASR_BEAM_SIZE = int(os.getenv("ASR_BEAM_SIZE", "1"))
ASR_LIGHT_MODE = os.getenv("ASR_LIGHT_MODE", "1") == "1"
ASR_LIGHT_SEC = float(os.getenv("ASR_LIGHT_SEC", "10.0"))
ASR_LIGHT_OFFSET = float(os.getenv("ASR_LIGHT_OFFSET", "2.0"))


def split_ranges(start, end, max_len=MAX_ASR_BLOCK_SEC, overlap=ASR_BLOCK_OVERLAP_SEC):
    ranges = []
    t = start
    while t < end - 0.1:
        t2 = min(end, t + max_len)
        ranges.append((t, t2))
        if t2 >= end:
            break
        t = t2 - overlap if overlap > 0 else t2
    return ranges


def safe_unlink(path: Path):
    try:
        if path.exists():
            path.unlink()
    except Exception:
        pass


def _bucket_index(t):
    if ANALYZED_DURATION <= 0:
        return 0
    idx = int((t / ANALYZED_DURATION) * TIME_BUCKETS)
    if idx < 0:
        idx = 0
    if idx >= TIME_BUCKETS:
        idx = TIME_BUCKETS - 1
    return idx


def _percentile_rank(values, x):
    if not values:
        return 0.0
    vs = sorted(values)
    lo, hi = 0, len(vs)
    while lo < hi:
        mid = (lo + hi) // 2
        if vs[mid] <= x:
            lo = mid + 1
        else:
            hi = mid
    return lo / len(vs)


def _pre_score(c, hook_vals, ptm_vals, spk_vals, sil_vals):
    f = c.get("features", {})
    p_hook = _percentile_rank(hook_vals, f.get("hook_energy", 0.0))
    p_ptm = _percentile_rank(ptm_vals, f.get("peak_to_mean", 0.0))
    p_spk = _percentile_rank(spk_vals, f.get("spike_rate", 0.0))
    p_sil = _percentile_rank(sil_vals, f.get("silence_ratio", 0.0))
    return (0.40 * p_hook) + (0.30 * p_ptm) + (0.20 * p_spk) - (0.20 * p_sil)


def _light_window(start, end):
    dur = max(0.0, end - start)
    if dur <= ASR_LIGHT_SEC:
        return start, end
    s = min(end - 0.1, start + ASR_LIGHT_OFFSET)
    e = min(end, s + ASR_LIGHT_SEC)
    if e - s < 4.0:
        # fallback to center window
        mid = start + dur * 0.5
        s = max(start, mid - ASR_LIGHT_SEC * 0.5)
        e = min(end, s + ASR_LIGHT_SEC)
    return s, e


with StageTimer(8, "ASR (top-N per bucket, cache, fail-safe)"):
    TRANSCRIPTS = {}
    transcript_cache_path = ART_DIR / "transcript.json"

    # Load cache
    if transcript_cache_path.exists():
        try:
            TRANSCRIPTS = read_json(transcript_cache_path) or {}
            logger.info(f"Loaded transcript cache: {transcript_cache_path} ({len(TRANSCRIPTS)} items)")
        except Exception as e:
            logger.warning(f"Transcript cache load failed; starting empty. ({e})")
            TRANSCRIPTS = {}

    # Prepare ASR model
    ASR_AVAILABLE = False
    model = None
    if ASR_ENABLED:
        try:
            try:
                from faster_whisper import WhisperModel
            except Exception as e:
                logger.warning(f"faster-whisper not available ({e}). Trying to install...")
                subprocess.run(["pip", "-q", "install", "faster-whisper"], check=True)
                from faster_whisper import WhisperModel

            model = WhisperModel(ASR_MODEL_NAME, device="cpu", compute_type="int8")
            ASR_AVAILABLE = True
            logger.info(f"ASR model ready: faster-whisper {ASR_MODEL_NAME} (cpu int8)")
        except Exception as e:
            logger.warning(f"ASR unavailable; continuing without transcript. ({e})")
            ASR_AVAILABLE = False

    def transcribe_candidate_abs(candidate: Dict[str,Any], mode: str = "full") -> Dict[str,Any]:
        if not ASR_AVAILABLE or model is None:
            return {"text":"", "words":[], "markers_abs":[], "words_per_sec":0.0, "trigger_count":0, "word_count":0, "mode": mode}

        cid = candidate["id"]
        start = float(candidate["start"])
        end = float(candidate["end"])

        if mode == "light":
            ls, le = _light_window(start, end)
            ranges = split_ranges(ls, le)
        else:
            ranges = split_ranges(start, end)

        all_words = []
        full_text = []
        markers_abs = []

        for bi, (bs, be) in enumerate(ranges, 1):
            block_wav = CACHE_DIR / f"asrblock_{cid}_{bi:02d}.wav"
            subprocess.run([
                FFMPEG_BIN, "-y", "-loglevel", "error",
                "-ss", str(bs), "-t", str(be - bs),
                "-i", str(ART_DIR / "audio.wav"),
                "-ac", "1", "-ar", "16000",
                str(block_wav)
            ], check=True)

            t0 = time.monotonic()
            logger.info(f"ASR [{cid}] block {bi}/{len(ranges)} START {bs:.2f}-{be:.2f} (mode={mode})")
            log_flush()

            segments, info = model.transcribe(str(block_wav), language=ASR_LANGUAGE, beam_size=ASR_BEAM_SIZE)
            for seg in segments:
                if seg.text:
                    full_text.append(seg.text)
                for w in (seg.words or []):
                    all_words.append({
                        "word": w.word,
                        "start": float(w.start + bs),
                        "end": float(w.end + bs)
                    })

            dt = time.monotonic() - t0
            logger.info(f"ASR [{cid}] block {bi} DONE in {dt:.2f}s")
            log_flush()

            safe_unlink(block_wav)

        # markers from trigger words
        trigger_set = set([t.lower() for t in (TRIGGER_WORDS if "TRIGGER_WORDS" in globals() else [])])
        for w in all_words:
            tok = (w.get("word") or "").strip().lower()
            if tok in trigger_set:
                markers_abs.append(float(w.get("start", 0.0)))

        text = " ".join(full_text).strip()
        word_count = len(all_words)
        dur = max(0.1, float(end - start)) if mode == "full" else max(0.1, float(ranges[-1][1] - ranges[0][0]))
        wps = float(word_count / dur)
        trigger_count = len(markers_abs)

        return {
            "text": text,
            "words": all_words,
            "markers_abs": markers_abs,
            "words_per_sec": wps,
            "trigger_count": trigger_count,
            "word_count": word_count,
            "mode": mode,
        }

    # Build per-bucket pools and pick top-N
    hook_vals = [c.get("features", {}).get("hook_energy", 0.0) for c in CANDIDATES]
    ptm_vals = [c.get("features", {}).get("peak_to_mean", 0.0) for c in CANDIDATES]
    spk_vals = [c.get("features", {}).get("spike_rate", 0.0) for c in CANDIDATES]
    sil_vals = [c.get("features", {}).get("silence_ratio", 0.0) for c in CANDIDATES]

    bucket_map = {b: [] for b in range(TIME_BUCKETS)}
    for c in CANDIDATES:
        b = _bucket_index(float(c.get("start", 0.0)))
        bucket_map[b].append(c)

    top_for_asr = []
    for b, arr in bucket_map.items():
        arr_sorted = sorted(arr, key=lambda c: _pre_score(c, hook_vals, ptm_vals, spk_vals, sil_vals), reverse=True)
        top_for_asr.extend(arr_sorted[:ASR_TOPN_PER_BUCKET])

    # Transcribe top-N per bucket
    if not ASR_AVAILABLE:
        logger.warning("SEMANTIC_FALLBACK: ASR unavailable")
    else:
        logger.info(f"ASR top-N per bucket: {len(top_for_asr)} candidates (mode={'light' if ASR_LIGHT_MODE else 'full'})")

    for c in top_for_asr:
        cid = c["id"]
        existing = TRANSCRIPTS.get(cid, {}) if isinstance(TRANSCRIPTS, dict) else {}
        if existing.get("text") and existing.get("mode") == "full":
            out = existing
        else:
            try:
                mode = "light" if ASR_LIGHT_MODE else "full"
                out = transcribe_candidate_abs(c, mode=mode)
                TRANSCRIPTS[cid] = out
            except Exception as e:
                logger.warning(f"ASR failed for {cid}: {e}")
                out = {"text":"", "words":[], "markers_abs":[], "words_per_sec":0.0, "trigger_count":0, "word_count":0, "mode": "light"}
                TRANSCRIPTS[cid] = out

        # Enrich candidate features from transcript
        f = c.get("features", {})
        f["words_per_sec"] = float(out.get("words_per_sec", 0.0))
        f["trigger_count"] = int(out.get("trigger_count", 0))
        f["markers_abs"] = list(out.get("markers_abs", []))
        f["word_count"] = int(out.get("word_count", 0))
        f["has_text"] = True if (out.get("text") or "").strip() else False
        c["features"] = f

    # Save transcript cache
    try:
        write_json(transcript_cache_path, TRANSCRIPTS)
        logger.info(f"Saved transcript cache (entries={len(TRANSCRIPTS)})")
    except Exception as e:
        logger.warning(f"Failed to save transcript cache: {e}")

    # Expose globals
    globals()["TRANSCRIPTS"] = TRANSCRIPTS
    globals()["ASR_AVAILABLE"] = ASR_AVAILABLE
    globals()["transcribe_candidate_abs"] = transcribe_candidate_abs


## üßÆ Stage 9: Scoring (Audit-able, Percentile-Based)

In [None]:
# =========================
# Stage 9: Scoring (Bucket-normalized, semantic-aware)
# =========================

TIME_BUCKETS = 5


def token_set(text: str):
    return set(re.findall(r"[a-zA-Z0-9]+", (text or "").lower()))


def clip100(x: float) -> float:
    return float(max(0.0, min(100.0, x)))


def percentile_rank(values, x):
    if not values:
        return 0.0
    vs = sorted(values)
    lo, hi = 0, len(vs)
    while lo < hi:
        mid = (lo + hi) // 2
        if vs[mid] <= x:
            lo = mid + 1
        else:
            hi = mid
    return lo / len(vs)


def bucket_index(t):
    if ANALYZED_DURATION <= 0:
        return 0
    idx = int((t / ANALYZED_DURATION) * TIME_BUCKETS)
    if idx < 0:
        idx = 0
    if idx >= TIME_BUCKETS:
        idx = TIME_BUCKETS - 1
    return idx


def finishability_score(end_t: float) -> float:
    if "SILENCE_SEGMENTS" in globals():
        for s, e in (SILENCE_SEGMENTS or []):
            s = float(s)
            if end_t <= s <= (end_t + 1.6):
                return 100.0
    if "SPEECH_BLOCKS" in globals():
        best = None
        for s, e in (SPEECH_BLOCKS or []):
            e = float(e)
            if e >= end_t:
                d = e - end_t
                if best is None or d < best:
                    best = d
        if best is not None:
            if best <= 1.0:
                return 85.0
            if best <= 2.5:
                return 55.0
    return 20.0


with StageTimer(9, "Scoring (bucket-normalized)"):
    # Build bucket pools
    buckets = {b: [] for b in range(TIME_BUCKETS)}
    for c in CANDIDATES:
        b = bucket_index(float(c.get("start", 0.0)))
        buckets[b].append(c)

    # Build per-bucket distributions
    def build_vals(key, transform=lambda x: x):
        vals = {b: [] for b in buckets}
        for b, arr in buckets.items():
            for c in arr:
                v = transform(c.get("features", {}).get(key, 0.0))
                vals[b].append(float(v))
        return vals

    hook_vals = build_vals("hook_energy")
    ptm_vals = build_vals("peak_to_mean")
    std_vals = build_vals("energy_stddev")
    spk_vals = build_vals("spike_rate")
    wps_vals = build_vals("words_per_sec")
    sil_vals = build_vals("silence_ratio")
    trig_vals = build_vals("trigger_count")

    # Novelty per bucket
    novelty_vals = {b: [] for b in buckets}
    for b, arr in buckets.items():
        seen = []
        arr_sorted = sorted(arr, key=lambda c: -c.get("features", {}).get("hook_energy", 0.0))
        for c in arr_sorted:
            txt = ""
            if "TRANSCRIPTS" in globals() and isinstance(TRANSCRIPTS, dict):
                txt = TRANSCRIPTS.get(c.get("id"), {}).get("text", "")
            tokens = token_set(txt)
            max_sim = 0.0
            for prev in seen[:8]:
                if tokens or prev:
                    max_sim = max(max_sim, len(tokens & prev) / max(1, len(tokens | prev)))
            novelty = clip100((1.0 - max_sim) * 100) if tokens else 50.0
            c.setdefault("scores", {})
            c["scores"]["novelty"] = novelty
            novelty_vals[b].append(float(novelty))
            seen.append(tokens)

    # Semantic availability
    SEMANTIC_AVAILABLE = False
    if "TRANSCRIPTS" in globals() and isinstance(TRANSCRIPTS, dict):
        SEMANTIC_AVAILABLE = any((TRANSCRIPTS.get(c.get("id"), {}).get("text", "") or "").strip() for c in CANDIDATES)

    if not SEMANTIC_AVAILABLE:
        logger.warning("SEMANTIC_FALLBACK: no transcripts for candidates")

    # Scoring per candidate
    for c in CANDIDATES:
        f = c.get("features", {})
        b = bucket_index(float(c.get("start", 0.0)))

        p_hook = percentile_rank(hook_vals[b], f.get("hook_energy", 0.0))
        p_ptm = percentile_rank(ptm_vals[b], f.get("peak_to_mean", 0.0))
        p_std = percentile_rank(std_vals[b], f.get("energy_stddev", 0.0))
        p_spk = percentile_rank(spk_vals[b], f.get("spike_rate", 0.0))
        p_wps = percentile_rank(wps_vals[b], f.get("words_per_sec", 0.0))
        p_trig = percentile_rank(trig_vals[b], f.get("trigger_count", 0.0))
        p_sil = percentile_rank(sil_vals[b], f.get("silence_ratio", 0.0))
        p_nov = percentile_rank(novelty_vals[b], c.get("scores", {}).get("novelty", 50.0))

        # Meaning score
        if SEMANTIC_AVAILABLE and f.get("has_text", False):
            meaning = clip100(100 * (0.45 * p_trig + 0.35 * p_wps + 0.20 * p_nov))
            semantic_mode = "semantic"
        elif SEMANTIC_AVAILABLE:
            meaning = 20.0
            semantic_mode = "no_text"
        else:
            meaning = clip100(100 * (0.60 * p_hook + 0.40 * p_spk))
            semantic_mode = "fallback"

        # Hook score
        early_marker = 0.0
        start = f.get("start_abs_for_scoring")
        if start is not None:
            ms = f.get("markers_abs", []) or []
            early_marker = 1.0 if any(start <= m <= start + HOOK_WINDOW_SEC for m in ms) else 0.0
        hook = clip100(100 * (0.60 * p_hook + 0.25 * early_marker + 0.15 * p_wps))

        # Energy score (not dominant)
        energy = clip100(100 * (0.40 * p_ptm + 0.35 * p_std + 0.25 * p_spk))

        # Clarity score
        fin = finishability_score(float(c.get("end", 0.0)))
        fin_n = fin / 100.0
        clarity = clip100(100 * (0.55 * fin_n + 0.30 * (1.0 - p_sil) + 0.15 * p_wps))

        if SEMANTIC_AVAILABLE:
            viral = (0.35 * meaning) + (0.25 * hook) + (0.20 * clarity) + (0.20 * energy)
        else:
            viral = (0.20 * meaning) + (0.30 * hook) + (0.25 * clarity) + (0.25 * energy)

        c["scores"] = {
            "meaning": meaning,
            "hook": hook,
            "clarity": clarity,
            "energy": energy,
            "novelty": c.get("scores", {}).get("novelty", 50.0),
            "finishability": fin,
            "viral_score": clip100(viral),
            "semantic_mode": semantic_mode,
        }

    # Editorial reason (audit)
    def editorial_reason(c):
        s = c.get("scores", {})
        return [
            f"Meaning {s.get('meaning',0):.1f} ({s.get('semantic_mode','')})",
            f"Hook {s.get('hook',0):.1f}",
            f"Clarity {s.get('clarity',0):.1f}",
            f"Energy {s.get('energy',0):.1f}",
            f"Novelty {s.get('novelty',0):.1f}",
        ]

    for c in CANDIDATES:
        c["editorial_reason"] = editorial_reason(c)

    # Observability: write full ranking.csv (all candidates)
    ranking_csv = OUT_DIR / "ranking.csv"
    with open(ranking_csv, "w", newline="", encoding="utf-8") as f:
        fieldnames = ["id","start","end","duration","bucket","meaning","hook","clarity","energy","novelty","finishability","viral_score","semantic_mode"]
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for c in sorted(CANDIDATES, key=lambda x: (-x.get("scores", {}).get("viral_score", 0.0), x.get("start", 0.0))):
            s = c.get("scores", {})
            w.writerow({
                "id": c.get("id"),
                "start": float(c.get("start", 0.0)),
                "end": float(c.get("end", 0.0)),
                "duration": float(c.get("duration", 0.0)),
                "bucket": bucket_index(float(c.get("start", 0.0))),
                "meaning": float(s.get("meaning", 0.0)),
                "hook": float(s.get("hook", 0.0)),
                "clarity": float(s.get("clarity", 0.0)),
                "energy": float(s.get("energy", 0.0)),
                "novelty": float(s.get("novelty", 0.0)),
                "finishability": float(s.get("finishability", 0.0)),
                "viral_score": float(s.get("viral_score", 0.0)),
                "semantic_mode": s.get("semantic_mode", "")
            })

    logger.info(f"Wrote ranking.csv (all candidates): {ranking_csv}")
    log_flush()


## üìä Log Rapi: Tabel Ranking (Bahasa Indonesia)

Cell ini membuat output seperti tabel (mirip screenshot):
- Menyimpan `/kaggle/working/output/ranking.csv`
- Menampilkan Top 12 preview
- Kolom pakai Bahasa Indonesia


In [None]:
# =========================
# Ranking Table (Rapi, Bahasa Indonesia)
# =========================
import pandas as pd

ranked = sorted(CANDIDATES, key=lambda x: (-x["scores"]["viral_score"], x["start"], x["id"]))

rows = []
for i, c in enumerate(ranked, 1):
    f = c.get("features", {})
    s = c.get("scores", {})
    rows.append({
        "peringkat": i,
        "id_segmen": c.get("id"),
        "tipe_segmen": c.get("type", ""),
        "mulai_detik": round(float(c.get("start", 0.0)), 3),
        "akhir_detik": round(float(c.get("end", 0.0)), 3),
        "durasi_detik": round(float(c.get("duration", 0.0)), 3),

        "skor_viral_total": round(float(s.get("viral_score", 0.0)), 3),

        "skor_hook": round(float(s.get("hook", 0.0)), 3),
        "skor_emosi": round(float(s.get("emotion", 0.0)), 3),
        "skor_kepadatan_info": round(float(s.get("density", 0.0)), 3),
        "skor_alur_cerita": round(float(s.get("arc", 0.0)), 3),
        "skor_keamanan_audio": round(float(s.get("safety", 0.0)), 3),
        "skor_posisi_puncak": round(float(s.get("peak_placement", 0.0)), 3),
        "skor_kebaruan": round(float(s.get("novelty", 0.0)), 3),

        # fitur mentah untuk audit (opsional)
        "energi_hook": round(float(f.get("hook_energy", 0.0)), 6),
        "tingkat_chaos": round(float(f.get("spike_rate", 0.0)), 6),
        "rasio_puncak_vs_rata2": round(float(f.get("peak_to_mean", 0.0)), 6),
        "kata_per_detik": round(float(f.get("words_per_sec", 0.0)), 6),
        "rasio_diam": round(float(f.get("silence_ratio", 0.0)), 6),
        "jarak_dari_cut_kamera": round(float(f.get("near_cut_dist", 9999.0)), 3),

        "alasan_editorial": (c.get("editorial_reason", [""])[0] if c.get("editorial_reason") else "")
    })

df_ranking = pd.DataFrame(rows)

# sesuai screenshot: simpan di /kaggle/working/output/
rank_csv = OUT_DIR / "ranking.csv"
df_ranking.to_csv(rank_csv, index=False, encoding="utf-8")
logger.info(f"‚úÖ Ranking saved: {rank_csv}")

TOP_N = 12
print(f"Top {TOP_N} preview:")
display(df_ranking.head(TOP_N))


## üß≠ Stage 10: Deterministic Selection + Diversity + Timeline Sanity

In [None]:
# =========================
# Stage 10: Selection (bucket quota + fairness)
# =========================

TIME_BUCKETS = 5


def overlaps(a_start, a_end, b_start, b_end, gap=MIN_GAP_SEC):
    return not (a_end + gap <= b_start or b_end + gap <= a_start)


def bucket_index(t):
    if ANALYZED_DURATION <= 0:
        return 0
    idx = int((t / ANALYZED_DURATION) * TIME_BUCKETS)
    if idx < 0:
        idx = 0
    if idx >= TIME_BUCKETS:
        idx = TIME_BUCKETS - 1
    return idx


def percentile(values, q):
    if not values:
        return 0.0
    vs = sorted(values)
    k = int(round((q / 100.0) * (len(vs) - 1)))
    return float(vs[max(0, min(len(vs)-1, k))])


with StageTimer(10, "Selection (bucket quota + fairness)"):
    selection_audit = []

    # Bucket pools
    buckets = {b: [] for b in range(TIME_BUCKETS)}
    for c in CANDIDATES:
        b = bucket_index(float(c.get("start", 0.0)))
        buckets[b].append(c)

    # Sort within buckets by viral_score desc
    for b in buckets:
        buckets[b].sort(key=lambda c: (-c.get("scores", {}).get("viral_score", 0.0), c.get("start", 0.0), c.get("id", "")))

    # Per-bucket thresholds (no global absolute)
    bucket_stats = {}
    meaning_min = {}
    score_min = {}
    for b, arr in buckets.items():
        meaning_vals = [c.get("scores", {}).get("meaning", 0.0) for c in arr]
        score_vals = [c.get("scores", {}).get("viral_score", 0.0) for c in arr]
        meaning_min[b] = percentile(meaning_vals, 35) if meaning_vals else 0.0
        score_min[b] = percentile(score_vals, 40) if score_vals else 0.0
        bucket_stats[b] = {
            "count": len(arr),
            "meaning_min": float(meaning_min[b]),
            "score_min": float(score_min[b]),
            "selected": 0,
            "quota": 0,
            "meaning_candidates": int(sum(1 for c in arr if c.get("scores", {}).get("meaning", 0.0) >= meaning_min[b]))
        }

    # Quota: base 1 per bucket, distribute remainder by bucket size
    base_quota = 1
    quotas = {b: 0 for b in buckets}
    for b in buckets:
        quotas[b] = base_quota if buckets[b] else 0

    remaining = MAX_FINAL_CLIPS - sum(quotas.values())
    if remaining > 0:
        order = sorted(buckets.keys(), key=lambda b: len(buckets[b]), reverse=True)
        i = 0
        while remaining > 0 and order:
            quotas[order[i % len(order)]] += 1
            remaining -= 1
            i += 1

    # Late bucket grace: ensure bucket 80-100% has at least 1 if meaning candidates exist
    late_bucket = TIME_BUCKETS - 1
    if bucket_stats[late_bucket]["meaning_candidates"] > 0 and quotas[late_bucket] < 1:
        quotas[late_bucket] = 1

    for b in quotas:
        bucket_stats[b]["quota"] = int(quotas[b])

    selected = []

    def can_add(c):
        b = bucket_index(float(c.get("start", 0.0)))
        if c.get("scores", {}).get("meaning", 0.0) < meaning_min[b]:
            return False, "below_meaning_min"
        if c.get("scores", {}).get("viral_score", 0.0) < score_min[b]:
            return False, "below_bucket_score_min"
        for s in selected:
            if overlaps(c["start"], c["end"], s["start"], s["end"], gap=MIN_GAP_SEC):
                return False, f"overlap_or_gap_with_{s['id']}"
        return True, "ok"

    # Phase 1: quota per bucket
    for b in range(TIME_BUCKETS):
        need = quotas[b]
        if need <= 0:
            continue
        pool = list(buckets[b])
        while need > 0 and pool:
            c = pool.pop(0)
            ok, why = can_add(c)
            if ok:
                selected.append(c)
                bucket_stats[b]["selected"] += 1
                selection_audit.append({"id": c["id"], "decision": "selected", "phase": "quota", "bucket": b, "start": c["start"], "score": c["scores"]["viral_score"], "reason": why, "editorial_reason": c.get("editorial_reason", [])})
                need -= 1
            else:
                selection_audit.append({"id": c["id"], "decision": "rejected", "phase": "quota", "bucket": b, "start": c["start"], "score": c["scores"]["viral_score"], "reason": why, "editorial_reason": c.get("editorial_reason", [])})

    # Phase 2: global fill
    if len(selected) < MAX_FINAL_CLIPS:
        remaining = sorted([c for c in CANDIDATES if c not in selected], key=lambda c: (-c.get("scores", {}).get("viral_score", 0.0), c.get("start", 0.0), c.get("id", "")))
        for c in remaining:
            if len(selected) >= MAX_FINAL_CLIPS:
                break
            ok, why = can_add(c)
            b = bucket_index(float(c.get("start", 0.0)))
            if not ok:
                selection_audit.append({"id": c["id"], "decision": "rejected", "phase": "fill", "bucket": b, "start": c["start"], "score": c["scores"]["viral_score"], "reason": why, "editorial_reason": c.get("editorial_reason", [])})
                continue
            selected.append(c)
            bucket_stats[b]["selected"] += 1
            selection_audit.append({"id": c["id"], "decision": "selected", "phase": "fill", "bucket": b, "start": c["start"], "score": c["scores"]["viral_score"], "reason": "ok", "editorial_reason": c.get("editorial_reason", [])})

    # Chronological output
    SELECTED = sorted(selected, key=lambda c: (c["start"], c["id"]))
    write_json(ART_DIR / "selected.json", SELECTED)
    write_json(ART_DIR / "selection_audit.json", selection_audit)
    write_json(OUT_DIR / "selection_audit.json", selection_audit)
    write_json(OUT_DIR / "bucket_stats.json", bucket_stats)

    logger.info("=== SELECTED CLIPS (chronological) ===")
    for j, c in enumerate(SELECTED, 1):
        rs = c.get("scores", {})
        logger.info(f"#{j:02d} {c['id']} | VS={rs.get('viral_score',0):.1f} | {c['start']:.2f}-{c['end']:.2f} ({c['duration']:.1f}s)")
    logger.info(f"Selected clips: {len(SELECTED)}")


## üß© NON-CORE: Packaging (Caption + Hashtag) ‚Äî setelah Selection (tidak mempengaruhi scoring)

- Dijalankan **setelah Stage 10** agar tidak mempengaruhi ranking/selection.
- Memanfaatkan hasil ASR `TRANSCRIPTS` jika tersedia.
- Jika ASR tidak tersedia/kosong, caption akan fallback deterministik.


In [None]:
# =========================
# NON-CORE: Packaging (Caption + Hashtag) ‚Äî LOG ONLY (after selection)
# =========================

def _dedupe_keep_order(xs):
    out = []
    for x in xs:
        if x and x not in out:
            out.append(x)
    return out

def build_caption_and_hashtags(clip: Dict[str,Any], transcript_text: str):
    """NON-CORE. Deterministic caption/hashtags berbasis transcript (ASR).
    Tidak pernah dipakai untuk scoring/selection.
    """
    s = clip.get("scores", {}) or {}
    text_raw = (transcript_text or "").strip()
    if not text_raw:
        return "Bagian paling seru di obrolan ini üéØ", ["#shorts","#reels","#tiktok","#podcast"]

    text_lc = text_raw.lower()

    parts = re.split(r'(?<=[\.!\?])\s+|\n+', text_raw)
    if len(parts) == 1:
        parts = re.split(r'[;,]\s+', text_raw)
    parts = [p.strip() for p in parts if p.strip()] or [text_raw]

    trigger = set((TRIGGER_WORDS if "TRIGGER_WORDS" in globals() else []) + ["haha","wk"])
    laugh  = set(["wkwk","wkwkwk","ngakak","ketawa","haha","wk"])

    def sent_score(sent: str) -> float:
        lc = sent.lower()
        toks = re.findall(r"[a-z0-9']+", lc)
        trig = sum(1 for t in toks if t in trigger)
        laug = sum(1 for t in toks if t in laugh)
        punct = 2*sent.count("!") + 2*sent.count("?")
        twist = 2 if ("ternyata" in lc or "plot twist" in lc or ("plot" in lc and "twist" in lc)) else 0
        chaos = 1 if any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]) else 0

        L = len(sent)
        len_pen = (-1 if L < 25 else 0) + (-1 if L > 110 else 0)
        return trig*3 + laug*3 + punct + twist*2 + chaos + len_pen

    best = max(parts, key=sent_score)

    # kalau terlalu pendek, gabung neighbor biar ‚Äúberdiri sendiri‚Äù
    best_words = re.findall(r"[a-z0-9']+", best.lower())
    if len(best_words) < 4 and len(parts) > 1:
        bi = parts.index(best)
        if bi > 0:
            best = (parts[bi-1] + " " + best).strip()
        elif bi < len(parts)-1:
            best = (best + " " + parts[bi+1]).strip()

    snippet = re.sub(r"\s+"," ", best).strip()
    if len(snippet) > 92:
        snippet = snippet[:92].rsplit(" ", 1)[0] + "‚Ä¶"

    lc = best.lower()
    if ("ternyata" in lc) or ("plot twist" in lc) or ("plot" in lc and "twist" in lc):
        caption = f"Plot twist: ‚Äú{snippet}‚Äù üëÄ"
    elif any(w in lc for w in ["ngakak","ketawa","wkwk","wkwkwk","haha"]):
        caption = f"Ngakak dulu: ‚Äú{snippet}‚Äù üòÇ"
    elif any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]):
        caption = f"Chaos dikit: ‚Äú{snippet}‚Äù üî•"
    elif ("?" in best) or any(w in lc for w in ["kok","loh","hah"]):
        caption = f"Kok bisa? ‚Äú{snippet}‚Äù ü§Ø"
    else:
        caption = f"‚Äú{snippet}‚Äù üéØ"

    base = ["#shorts","#reels","#tiktok","#podcast"]
    extras = []

    topic_rules = [
        (r"\b(ngakak|ketawa|wkwk|wkwkwk|lucu|komedi|kocak)\b", "#komedi"),
        (r"\b(marah|emosi|ribut|debat|berantem|panas)\b", "#debat"),
        (r"\b(duit|uang|cuan|bisnis|jualan|modal|untung|rugi|investasi|saham|crypto|forex)\b", "#keuangan"),
        (r"\b(politik|pemilu|presiden|menteri|dpr)\b", "#politik"),
        (r"\b(cinta|pacar|mantan|nikah)\b", "#relationship"),
        (r"\b(game|ml|mobile legends|valorant|pubg|ff|free fire)\b", "#gaming"),
        (r"\b(makanan|kuliner|masak|pedas|enak)\b", "#kuliner"),
    ]
    for pat, tag in topic_rules:
        if re.search(pat, text_lc):
            extras.append(tag)

    if re.search(r"\b(anjir|anjay|gila|parah|buset|astaga|waduh|yaampun)\b", text_lc):
        extras.append("#chaos")
    if re.search(r"\b(ternyata|plot|twist)\b", text_lc):
        extras.append("#plottwist")

    if float(s.get("viral_score", 0.0)) >= 80:
        extras.append("#viral")

    tags = _dedupe_keep_order(base + extras)
    return caption, tags[:8]

# Attach packaging to SELECTED (log-only). Never read by scoring/selection.
missing = 0
for c in SELECTED:
    if c.get("caption") and c.get("hashtags"):
        continue
    cid = c.get("id")
    t = ""
    if "TRANSCRIPTS" in globals() and isinstance(TRANSCRIPTS, dict):
        # In this pipeline, TRANSCRIPTS is keyed by candidate id.
        t = (TRANSCRIPTS.get(cid, {}) or {}).get("text", "") or ""
    if not t:
        missing += 1
    cap, tags = build_caption_and_hashtags(c, t)
    c["caption"] = cap
    c["hashtags"] = tags

logger.info(f"Packaging attached to SELECTED (log-only). Missing transcript for {missing}/{len(SELECTED)} selected clips.")
log_flush()


## ‚úÖ Log Rapi: Tabel Clip Terpilih (Bahasa Indonesia + Caption/Hashtag)


In [None]:
# =========================
# Clip Terpilih (Rapi, Bahasa Indonesia) ‚Äî termasuk Caption & Hashtag (preview)
# =========================
import pandas as pd

def _safe_join(xs):
    if not xs:
        return ""
    return " ".join([str(x) for x in xs if x])

rows = []
for rank, c in enumerate(SELECTED, 1):
    s = c.get("scores", {}) or {}
    rows.append({
        "peringkat": rank,
        "id_segmen": c.get("id"),
        "tipe_segmen": c.get("type"),
        "mulai_detik": float(c.get("start", 0.0)),
        "akhir_detik": float(c.get("end", 0.0)),
        "durasi_detik": float(c.get("duration", 0.0)),
        "skor_viral_total": float(s.get("viral_score", 0.0)),
        "skor_hook": float(s.get("hook", 0.0)),
        "skor_emosi": float(s.get("emotion", 0.0)),
        "skor_kepadatan_info": float(s.get("density", 0.0)),
        "skor_alur_cerita": float(s.get("arc", 0.0)),
        "skor_keamanan_audio": float(s.get("safety", 0.0)),
        "skor_kebaruan": float(s.get("novelty", 0.0)),
        "caption": c.get("caption", ""),
        "hashtag": _safe_join(c.get("hashtags", [])) if isinstance(c.get("hashtags"), list) else str(c.get("hashtags") or ""),
        "alasan_editorial": _safe_join(c.get("editorial_reason", [])) if isinstance(c.get("editorial_reason"), list) else str(c.get("editorial_reason") or ""),
    })

df_clip_terpilih = pd.DataFrame(rows)
df_clip_terpilih


## ‚úÇÔ∏è Stage 11: Cut Rules (Snap to Word/Silence + Avoid Shot-Cut)

In [None]:
# =========================
# Stage 11: Cut Rules (Snap to Word/Silence + End-of-Idea Polish)
# =========================

def snap_to_words(start: float, end: float, words: List[Dict[str,Any]], radius: float=1.4):
    if not words:
        return start, end
    ws = [w for w in words if (start - radius) <= w["start"] <= (start + radius)]
    if ws:
        prior = [w for w in ws if w["start"] <= start]
        if prior:
            start = max(prior, key=lambda w: w["start"])["start"]
        else:
            start = min(ws, key=lambda w: abs(w["start"]-start))["start"]

    we = [w for w in words if (end - radius) <= w["end"] <= (end + radius)]
    if we:
        after = [w for w in we if w["end"] >= end]
        if after:
            end = min(after, key=lambda w: w["end"])["end"]
        else:
            end = min(we, key=lambda w: abs(w["end"]-end))["end"]
    return start, end


def snap_to_silence_edges(start: float, end: float, silence_segments: List[List[float]], radius: float=1.0):
    best_s = start
    cand = [(s,e) for s,e in silence_segments if abs(e-start) <= radius and e <= start]
    if cand:
        best_s = max(cand, key=lambda x: x[1])[1]

    best_e = end
    cand2 = [(s,e) for s,e in silence_segments if abs(s-end) <= radius and s >= end]
    if cand2:
        best_e = min(cand2, key=lambda x: x[0])[0]
    return best_s, best_e


def avoid_shot_cut(t: float, shot_cuts: List[float], min_dist=0.30, shift=0.35, direction=+1):
    if not shot_cuts:
        return t
    for c in shot_cuts:
        if abs(c - t) < min_dist:
            return t + direction * shift
    return t

# --- Editorial helpers ---
TAIL_EXT_SEC = 1.2
TAIL_MAX_SEC = 2.0
LEAD_SILENCE_TRIM_SEC = 0.8
START_EXPAND_MIN = 2.0
START_EXPAND_MAX = 6.0
START_SPEECH_WINDOW = 0.6


def _find_silence_start_after(t: float, silence_segments: List[List[float]], max_after: float):
    best = None
    for s, e in silence_segments:
        if s >= t and s <= (t + max_after):
            if best is None or s < best:
                best = s
    return best


def _find_silence_end_before(t: float, silence_segments: List[List[float]], min_before: float, max_before: float):
    cand = []
    for s, e in silence_segments:
        if e <= t:
            d = t - e
            if min_before <= d <= max_before:
                cand.append((d, e))
    if not cand:
        return None
    cand.sort(key=lambda x: x[0])
    return float(cand[0][1])


def _next_word_end_after(t: float, words: List[Dict[str,Any]], max_after: float):
    cand = []
    for w in words or []:
        if "end" not in w:
            continue
        we = float(w["end"])
        if we >= t and we <= (t + max_after):
            cand.append(we)
    return min(cand) if cand else None


def _prev_word_start_before(t: float, words: List[Dict[str,Any]], min_before: float, max_before: float):
    cand = []
    for w in words or []:
        if "start" not in w:
            continue
        ws = float(w["start"])
        if ws <= t:
            d = t - ws
            if min_before <= d <= max_before:
                cand.append((d, ws))
    if not cand:
        return None
    cand.sort(key=lambda x: x[0])
    return float(cand[0][1])


def _speech_active_near_end(t: float, words: List[Dict[str,Any]], window: float = 0.6):
    if not words:
        return False
    lo = t - window
    hi = t + 0.2
    for w in words:
        if "end" not in w:
            continue
        we = float(w["end"])
        if lo <= we <= hi:
            return True
    return False


def _speech_active_near_start(t: float, words: List[Dict[str,Any]], window: float = 0.6):
    if not words:
        return False
    lo = t - window
    hi = t + 0.2
    for w in words:
        if "start" not in w:
            continue
        ws = float(w["start"])
        if lo <= ws <= hi:
            return True
    return False


def _add_reason(c: Dict[str,Any], msg: str):
    er = c.get("editorial_reason", [])
    if not isinstance(er, list):
        er = [str(er)]
    if msg not in er:
        er.append(msg)
    c["editorial_reason"] = er


with StageTimer(11, "Snap boundaries + end-of-idea polish"):
    if globals().get("_SNAP_DONE"):
        logger.info("Stage 11 already completed; skipping duplicate run")
        log_flush()
    else:
        TRANSCRIPTS = globals().get("TRANSCRIPTS", {})
        if not isinstance(TRANSCRIPTS, dict):
            TRANSCRIPTS = {}

        # Selected-only ASR full if missing or light
        ASR_AVAILABLE = bool(globals().get("ASR_AVAILABLE", False)) and ("transcribe_candidate_abs" in globals())
        if ASR_AVAILABLE:
            def transcribe_candidate_abs_local(candidate):
                return globals()["transcribe_candidate_abs"](candidate, mode="full")
        else:
            def transcribe_candidate_abs_local(candidate):
                return {"text":"", "words":[], "markers_abs":[], "words_per_sec":0.0, "trigger_count":0, "word_count":0}

        for c in SELECTED:
            cid = c.get("id")
            tinfo = TRANSCRIPTS.get(cid, {}) if isinstance(TRANSCRIPTS, dict) else {}
            if cid and (cid not in TRANSCRIPTS or tinfo.get("mode") != "full"):
                try:
                    logger.info(f"ASR (selected-only full) {cid} for snapping")
                    out = transcribe_candidate_abs_local(c)
                    TRANSCRIPTS[cid] = out
                    if out.get("words"):
                        _add_reason(c, "asr:selected_full")
                    else:
                        _add_reason(c, "asr:selected_full_empty")
                except Exception as e:
                    logger.warning(f"Selected-only ASR failed for {cid}: {e}")
                    _add_reason(c, "asr:selected_full_failed")

        # Apply snapping + editorial polish
        for c in SELECTED:
            cid = c.get("id")
            s = float(c["start"]); e = float(c["end"])

            # avoid cutting right on a shot-cut
            s_before, e_before = s, e
            s = avoid_shot_cut(s, SHOT_CUTS, direction=+1)
            if s != s_before:
                _add_reason(c, f"avoid_shot_cut:start(+{s - s_before:.2f}s)")

            e = avoid_shot_cut(e, SHOT_CUTS, direction=+1)
            if e != e_before:
                _add_reason(c, f"avoid_shot_cut:end(+{e - e_before:.2f}s)")

            words = (TRANSCRIPTS.get(cid, {}) or {}).get("words", []) or []

            # trim leading silence if any
            for (ss,se) in SILENCE_SEGMENTS:
                if se <= s and (s - se) <= LEAD_SILENCE_TRIM_SEC:
                    s = float(se)
                    _add_reason(c, "trim:leading_silence")
                    break

            # context expansion (2-6s) if speech is active at start
            if _speech_active_near_start(s, words, window=START_SPEECH_WINDOW):
                sil_prev = _find_silence_end_before(s, SILENCE_SEGMENTS, START_EXPAND_MIN, START_EXPAND_MAX)
                if sil_prev is not None:
                    s = float(sil_prev)
                    _add_reason(c, f"context:expand_silence(-{s_before - s:.2f}s)")
                else:
                    ws_prev = _prev_word_start_before(s, words, START_EXPAND_MIN, START_EXPAND_MAX)
                    if ws_prev is not None:
                        s = float(ws_prev)
                        _add_reason(c, f"context:expand_word(-{s_before - s:.2f}s)")

            # snap to word boundaries
            s2, e2 = snap_to_words(s, e, words, radius=1.4)
            _add_reason(c, "snap:word" if words else "snap:nowords")

            # if snapping made it too short, try silence edges
            if (e2 - s2) < MIN_CLIP_SEC:
                s2, e2 = snap_to_silence_edges(s, e, SILENCE_SEGMENTS, radius=1.0)
                _add_reason(c, "snap:silence_fallback")

            # clamp to video bounds
            s2 = max(0.0, float(s2))
            e2 = min(float(ANALYZED_DURATION), float(e2))
            if (e2 - s2) < MIN_CLIP_SEC:
                logger.warning(f"Snap too short for {cid}; keeping original bounds")
                _add_reason(c, "snap:too_short_keep_original")
                continue

            # --- End-of-idea polish (even if dur <= max) ---
            hard_cap = min(float(ANALYZED_DURATION), float(s2 + MAX_CLIP_SEC))
            still_speaking = _speech_active_near_end(e2, words, window=0.6)

            if still_speaking and e2 < hard_cap:
                sil = _find_silence_start_after(e2, SILENCE_SEGMENTS, max_after=min(TAIL_MAX_SEC, hard_cap - e2))
                if sil is not None:
                    e2 = min(float(sil), hard_cap)
                    _add_reason(c, f"end:seek_silence(+{e2 - e_before:.2f}s)")
                else:
                    wend = _next_word_end_after(e2, words, max_after=min(TAIL_EXT_SEC, hard_cap - e2))
                    if wend is not None:
                        e2 = min(float(wend), hard_cap)
                        _add_reason(c, f"end:seek_word(+{e2 - e_before:.2f}s)")
                    else:
                        e2 = min(float(e2 + min(TAIL_EXT_SEC, hard_cap - e2)), hard_cap)
                        _add_reason(c, f"end:extend(+{e2 - e_before:.2f}s)")
            elif not still_speaking:
                # trim meaningless tail if silence starts shortly before end
                for (ss,se) in SILENCE_SEGMENTS:
                    if ss <= e2 and (e2 - ss) <= 1.2:
                        new_e = float(ss)
                        if (new_e - s2) >= MIN_CLIP_SEC:
                            e2 = new_e
                            _add_reason(c, "end:trim_to_silence")
                        break

            # --- Editorial clamp if over max ---
            dur = (e2 - s2)
            if dur > MAX_CLIP_SEC:
                _add_reason(c, f"clamp:max_sec({MAX_CLIP_SEC:.1f})")
                hard_end = float(s2 + MAX_CLIP_SEC)
                e_limit = min(float(e2), float(ANALYZED_DURATION))

                still_speaking = _speech_active_near_end(hard_end, words, window=0.6)
                if still_speaking:
                    _add_reason(c, "end:still_speaking")
                    sil = _find_silence_start_after(hard_end, SILENCE_SEGMENTS, max_after=TAIL_MAX_SEC)
                    if sil is not None:
                        e2 = min(float(sil), e_limit)
                        _add_reason(c, f"end:silence(+{e2 - hard_end:.2f}s)")
                    else:
                        wend = _next_word_end_after(hard_end, words, max_after=min(TAIL_EXT_SEC, TAIL_MAX_SEC))
                        if wend is not None:
                            e2 = min(float(wend), e_limit)
                            _add_reason(c, f"end:word(+{e2 - hard_end:.2f}s)")
                        else:
                            e2 = min(float(hard_end + min(TAIL_EXT_SEC, TAIL_MAX_SEC)), e_limit)
                            _add_reason(c, f"end:extend(+{e2 - hard_end:.2f}s)")
                else:
                    e2 = min(float(hard_end), e_limit)
                    _add_reason(c, "end:hard_end")

            # final safety
            e2 = min(float(ANALYZED_DURATION), float(e2))
            if e2 <= s2:
                logger.warning(f"Bad bounds after clamp for {cid}; keeping original bounds")
                _add_reason(c, "bounds:bad_keep_original")
                continue
            if (e2 - s2) < MIN_CLIP_SEC:
                logger.warning(f"Too short after clamp for {cid}; keeping original bounds")
                _add_reason(c, "bounds:too_short_keep_original")
                continue

            c["start"] = float(s2)
            c["end"] = float(e2)
            c["duration"] = float(e2 - s2)

        write_json(ART_DIR / "selected_snapped.json", SELECTED)
        logger.info("Snapping completed (end-of-idea polish enabled).")

        try:
            write_json(ART_DIR / "transcript.json", TRANSCRIPTS)
            logger.info(f"Saved transcript cache after snapping (entries={len(TRANSCRIPTS)})")
        except Exception as e:
            logger.warning(f"Failed to save transcript cache after snapping: {e}")

        globals()["TRANSCRIPTS"] = TRANSCRIPTS
        globals()["_SNAP_DONE"] = True
        log_flush()


In [None]:
# =========================
# Stage 11: Cut Rules (Snap to Word/Silence + End-of-Idea Polish)
# =========================

def snap_to_words(start: float, end: float, words: List[Dict[str,Any]], radius: float=1.4):
    if not words:
        return start, end
    ws = [w for w in words if (start - radius) <= w["start"] <= (start + radius)]
    if ws:
        prior = [w for w in ws if w["start"] <= start]
        if prior:
            start = max(prior, key=lambda w: w["start"])["start"]
        else:
            start = min(ws, key=lambda w: abs(w["start"]-start))["start"]

    we = [w for w in words if (end - radius) <= w["end"] <= (end + radius)]
    if we:
        after = [w for w in we if w["end"] >= end]
        if after:
            end = min(after, key=lambda w: w["end"])["end"]
        else:
            end = min(we, key=lambda w: abs(w["end"]-end))["end"]
    return start, end


def snap_to_silence_edges(start: float, end: float, silence_segments: List[List[float]], radius: float=1.0):
    best_s = start
    cand = [(s,e) for s,e in silence_segments if abs(e-start) <= radius and e <= start]
    if cand:
        best_s = max(cand, key=lambda x: x[1])[1]

    best_e = end
    cand2 = [(s,e) for s,e in silence_segments if abs(s-end) <= radius and s >= end]
    if cand2:
        best_e = min(cand2, key=lambda x: x[0])[0]
    return best_s, best_e


def avoid_shot_cut(t: float, shot_cuts: List[float], min_dist=0.30, shift=0.35, direction=+1):
    if not shot_cuts:
        return t
    for c in shot_cuts:
        if abs(c - t) < min_dist:
            return t + direction * shift
    return t

# --- Editorial helpers ---
TAIL_EXT_SEC = 1.2
TAIL_MAX_SEC = 2.0
LEAD_SILENCE_TRIM_SEC = 0.8
START_EXPAND_MIN = 2.0
START_EXPAND_MAX = 6.0
START_SPEECH_WINDOW = 0.6


def _find_silence_start_after(t: float, silence_segments: List[List[float]], max_after: float):
    best = None
    for s, e in silence_segments:
        if s >= t and s <= (t + max_after):
            if best is None or s < best:
                best = s
    return best


def _find_silence_end_before(t: float, silence_segments: List[List[float]], min_before: float, max_before: float):
    cand = []
    for s, e in silence_segments:
        if e <= t:
            d = t - e
            if min_before <= d <= max_before:
                cand.append((d, e))
    if not cand:
        return None
    cand.sort(key=lambda x: x[0])
    return float(cand[0][1])


def _next_word_end_after(t: float, words: List[Dict[str,Any]], max_after: float):
    cand = []
    for w in words or []:
        if "end" not in w:
            continue
        we = float(w["end"])
        if we >= t and we <= (t + max_after):
            cand.append(we)
    return min(cand) if cand else None


def _prev_word_start_before(t: float, words: List[Dict[str,Any]], min_before: float, max_before: float):
    cand = []
    for w in words or []:
        if "start" not in w:
            continue
        ws = float(w["start"])
        if ws <= t:
            d = t - ws
            if min_before <= d <= max_before:
                cand.append((d, ws))
    if not cand:
        return None
    cand.sort(key=lambda x: x[0])
    return float(cand[0][1])


def _speech_active_near_end(t: float, words: List[Dict[str,Any]], window: float = 0.6):
    if not words:
        return False
    lo = t - window
    hi = t + 0.2
    for w in words:
        if "end" not in w:
            continue
        we = float(w["end"])
        if lo <= we <= hi:
            return True
    return False


def _speech_active_near_start(t: float, words: List[Dict[str,Any]], window: float = 0.6):
    if not words:
        return False
    lo = t - window
    hi = t + 0.2
    for w in words:
        if "start" not in w:
            continue
        ws = float(w["start"])
        if lo <= ws <= hi:
            return True
    return False


def _add_reason(c: Dict[str,Any], msg: str):
    er = c.get("editorial_reason", [])
    if not isinstance(er, list):
        er = [str(er)]
    if msg not in er:
        er.append(msg)
    c["editorial_reason"] = er


with StageTimer(11, "Snap boundaries + end-of-idea polish"):
    if globals().get("_SNAP_DONE"):
        logger.info("Stage 11 already completed; skipping duplicate run")
        log_flush()
    else:
        TRANSCRIPTS = globals().get("TRANSCRIPTS", {})
        if not isinstance(TRANSCRIPTS, dict):
            TRANSCRIPTS = {}

        # Selected-only ASR full if missing or light
        ASR_AVAILABLE = bool(globals().get("ASR_AVAILABLE", False)) and ("transcribe_candidate_abs" in globals())
        if ASR_AVAILABLE:
            def transcribe_candidate_abs_local(candidate):
                return globals()["transcribe_candidate_abs"](candidate, mode="full")
        else:
            def transcribe_candidate_abs_local(candidate):
                return {"text":"", "words":[], "markers_abs":[], "words_per_sec":0.0, "trigger_count":0, "word_count":0}

        for c in SELECTED:
            cid = c.get("id")
            tinfo = TRANSCRIPTS.get(cid, {}) if isinstance(TRANSCRIPTS, dict) else {}
            if cid and (cid not in TRANSCRIPTS or tinfo.get("mode") != "full"):
                try:
                    logger.info(f"ASR (selected-only full) {cid} for snapping")
                    out = transcribe_candidate_abs_local(c)
                    TRANSCRIPTS[cid] = out
                    if out.get("words"):
                        _add_reason(c, "asr:selected_full")
                    else:
                        _add_reason(c, "asr:selected_full_empty")
                except Exception as e:
                    logger.warning(f"Selected-only ASR failed for {cid}: {e}")
                    _add_reason(c, "asr:selected_full_failed")

        # Apply snapping + editorial polish
        for c in SELECTED:
            cid = c.get("id")
            s = float(c["start"]); e = float(c["end"])

            # avoid cutting right on a shot-cut
            s_before, e_before = s, e
            s = avoid_shot_cut(s, SHOT_CUTS, direction=+1)
            if s != s_before:
                _add_reason(c, f"avoid_shot_cut:start(+{s - s_before:.2f}s)")

            e = avoid_shot_cut(e, SHOT_CUTS, direction=+1)
            if e != e_before:
                _add_reason(c, f"avoid_shot_cut:end(+{e - e_before:.2f}s)")

            words = (TRANSCRIPTS.get(cid, {}) or {}).get("words", []) or []

            # trim leading silence if any
            for (ss,se) in SILENCE_SEGMENTS:
                if se <= s and (s - se) <= LEAD_SILENCE_TRIM_SEC:
                    s = float(se)
                    _add_reason(c, "trim:leading_silence")
                    break

            # context expansion (2-6s) if speech is active at start
            if _speech_active_near_start(s, words, window=START_SPEECH_WINDOW):
                sil_prev = _find_silence_end_before(s, SILENCE_SEGMENTS, START_EXPAND_MIN, START_EXPAND_MAX)
                if sil_prev is not None:
                    s = float(sil_prev)
                    _add_reason(c, f"context:expand_silence(-{s_before - s:.2f}s)")
                else:
                    ws_prev = _prev_word_start_before(s, words, START_EXPAND_MIN, START_EXPAND_MAX)
                    if ws_prev is not None:
                        s = float(ws_prev)
                        _add_reason(c, f"context:expand_word(-{s_before - s:.2f}s)")

            # snap to word boundaries
            s2, e2 = snap_to_words(s, e, words, radius=1.4)
            _add_reason(c, "snap:word" if words else "snap:nowords")

            # if snapping made it too short, try silence edges
            if (e2 - s2) < MIN_CLIP_SEC:
                s2, e2 = snap_to_silence_edges(s, e, SILENCE_SEGMENTS, radius=1.0)
                _add_reason(c, "snap:silence_fallback")

            # clamp to video bounds
            s2 = max(0.0, float(s2))
            e2 = min(float(ANALYZED_DURATION), float(e2))
            if (e2 - s2) < MIN_CLIP_SEC:
                logger.warning(f"Snap too short for {cid}; keeping original bounds")
                _add_reason(c, "snap:too_short_keep_original")
                continue

            # --- End-of-idea polish (even if dur <= max) ---
            hard_cap = min(float(ANALYZED_DURATION), float(s2 + MAX_CLIP_SEC))
            still_speaking = _speech_active_near_end(e2, words, window=0.6)

            if still_speaking and e2 < hard_cap:
                sil = _find_silence_start_after(e2, SILENCE_SEGMENTS, max_after=min(TAIL_MAX_SEC, hard_cap - e2))
                if sil is not None:
                    e2 = min(float(sil), hard_cap)
                    _add_reason(c, f"end:seek_silence(+{e2 - e_before:.2f}s)")
                else:
                    wend = _next_word_end_after(e2, words, max_after=min(TAIL_EXT_SEC, hard_cap - e2))
                    if wend is not None:
                        e2 = min(float(wend), hard_cap)
                        _add_reason(c, f"end:seek_word(+{e2 - e_before:.2f}s)")
                    else:
                        e2 = min(float(e2 + min(TAIL_EXT_SEC, hard_cap - e2)), hard_cap)
                        _add_reason(c, f"end:extend(+{e2 - e_before:.2f}s)")
            elif not still_speaking:
                # trim meaningless tail if silence starts shortly before end
                for (ss,se) in SILENCE_SEGMENTS:
                    if ss <= e2 and (e2 - ss) <= 1.2:
                        new_e = float(ss)
                        if (new_e - s2) >= MIN_CLIP_SEC:
                            e2 = new_e
                            _add_reason(c, "end:trim_to_silence")
                        break

            # --- Editorial clamp if over max ---
            dur = (e2 - s2)
            if dur > MAX_CLIP_SEC:
                _add_reason(c, f"clamp:max_sec({MAX_CLIP_SEC:.1f})")
                hard_end = float(s2 + MAX_CLIP_SEC)
                e_limit = min(float(e2), float(ANALYZED_DURATION))

                still_speaking = _speech_active_near_end(hard_end, words, window=0.6)
                if still_speaking:
                    _add_reason(c, "end:still_speaking")
                    sil = _find_silence_start_after(hard_end, SILENCE_SEGMENTS, max_after=TAIL_MAX_SEC)
                    if sil is not None:
                        e2 = min(float(sil), e_limit)
                        _add_reason(c, f"end:silence(+{e2 - hard_end:.2f}s)")
                    else:
                        wend = _next_word_end_after(hard_end, words, max_after=min(TAIL_EXT_SEC, TAIL_MAX_SEC))
                        if wend is not None:
                            e2 = min(float(wend), e_limit)
                            _add_reason(c, f"end:word(+{e2 - hard_end:.2f}s)")
                        else:
                            e2 = min(float(hard_end + min(TAIL_EXT_SEC, TAIL_MAX_SEC)), e_limit)
                            _add_reason(c, f"end:extend(+{e2 - hard_end:.2f}s)")
                else:
                    e2 = min(float(hard_end), e_limit)
                    _add_reason(c, "end:hard_end")

            # final safety
            e2 = min(float(ANALYZED_DURATION), float(e2))
            if e2 <= s2:
                logger.warning(f"Bad bounds after clamp for {cid}; keeping original bounds")
                _add_reason(c, "bounds:bad_keep_original")
                continue
            if (e2 - s2) < MIN_CLIP_SEC:
                logger.warning(f"Too short after clamp for {cid}; keeping original bounds")
                _add_reason(c, "bounds:too_short_keep_original")
                continue

            c["start"] = float(s2)
            c["end"] = float(e2)
            c["duration"] = float(e2 - s2)

        write_json(ART_DIR / "selected_snapped.json", SELECTED)
        logger.info("Snapping completed (end-of-idea polish enabled).")

        try:
            write_json(ART_DIR / "transcript.json", TRANSCRIPTS)
            logger.info(f"Saved transcript cache after snapping (entries={len(TRANSCRIPTS)})")
        except Exception as e:
            logger.warning(f"Failed to save transcript cache after snapping: {e}")

        globals()["TRANSCRIPTS"] = TRANSCRIPTS
        globals()["_SNAP_DONE"] = True
        log_flush()


## üì¶ Stage 12: Export (9:16 FIT + Letterbox)

In [None]:
# =========================
# Stage 12 (ONE CELL): Finalize + Export (NO ZOOM / PAD) + Thumbs + Ranking
# =========================

import csv
import subprocess
from pathlib import Path
from typing import List

EXPORT_W = int(globals().get("EXPORT_W", 1080))
EXPORT_H = int(globals().get("EXPORT_H", 1920))
FPS_EXPORT = int(globals().get("FPS_EXPORT", 30))  # opsional

def run_cmd(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess:
    return subprocess.run(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        check=check,
    )

def _portrait_vf_nozoom() -> str:
    """
    NO ZOOM: scale to fit (decrease) + pad to exact portrait size.
    Ini aman untuk acceptance test resolusi, tapi ada black bars.
    """
    return (
        f"scale={EXPORT_W}:{EXPORT_H}:force_original_aspect_ratio=decrease,"
        f"pad={EXPORT_W}:{EXPORT_H}:(ow-iw)/2:(oh-ih)/2,"
        f"setsar=1"
    )

def export_clip(video_path: Path, out_path: Path, start: float, end: float) -> None:
    out_path.parent.mkdir(parents=True, exist_ok=True)
    cmd = [
        "ffmpeg", "-y",
        "-ss", f"{start:.3f}", "-to", f"{end:.3f}",
        "-i", str(video_path),
        "-vf", _portrait_vf_nozoom(),
        "-r", str(FPS_EXPORT),
        "-c:v", "libx264", "-preset", "veryfast", "-crf", "20",
        "-pix_fmt", "yuv420p",
        "-c:a", "aac", "-b:a", AUDIO_BITRATE,
        "-movflags", "+faststart",
        str(out_path),
    ]
    run_cmd(cmd, check=True)

def _ffprobe_duration(path: Path) -> float:
    try:
        out = subprocess.check_output([
            FFPROBE_BIN, '-v', 'error',
            '-show_entries', 'format=duration',
            '-of', 'default=noprint_wrappers=1:nokey=1',
            str(path)
        ]).decode('utf-8').strip()
        return float(out)
    except Exception:
        return -1.0

def _validate_clip_file(path: Path, expected_dur: float, tol: float=1.2) -> None:
    assert path.exists(), f"Missing clip file: {path}"
    size = path.stat().st_size
    assert size > 50_000, f"Clip too small / likely failed: {path} ({size} bytes)"
    dur = _ffprobe_duration(path)
    assert dur > 0, f"ffprobe duration failed: {path}"
    # tolerate keyframe/encoder variance
    assert abs(dur - expected_dur) <= tol, f"Bad duration {dur:.2f}s (expected ~{expected_dur:.2f}s) for {path}"

def export_thumb(video_path: Path, out_path: Path, t: float) -> None:

    out_path.parent.mkdir(parents=True, exist_ok=True)
    cmd = [
        "ffmpeg", "-y",
        "-ss", f"{t:.3f}",
        "-i", str(video_path),
        "-vf", _portrait_vf_nozoom(),
        "-vframes", "1",
        "-q:v", "2",
        str(out_path),
    ]
    run_cmd(cmd, check=True)

with StageTimer(12, f"Finalize + Export (NO ZOOM pad {EXPORT_W}x{EXPORT_H}@{FPS_EXPORT}) + Thumbs + Ranking"):
    assert "VIDEO_PATH" in globals(), "VIDEO_PATH missing"
    assert "SELECTED" in globals() and isinstance(SELECTED, list), "SELECTED missing"
    assert "CLIPS_DIR" in globals() and "THUMBS_DIR" in globals(), "CLIPS_DIR/THUMBS_DIR missing"
    assert "OUT_DIR" in globals(), "OUT_DIR missing"

    CLIPS_DIR.mkdir(parents=True, exist_ok=True)
    THUMBS_DIR.mkdir(parents=True, exist_ok=True)
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    # clean old exports
    for p in CLIPS_DIR.glob("*.mp4"):
        p.unlink()
    for p in THUMBS_DIR.glob("*.jpg"):
        p.unlink()

    ranking_rows = []
    for rank, c in enumerate(SELECTED, 1):
        cid = str(c.get("id"))
        st  = float(c.get("start", 0.0))
        en  = float(c.get("end", st))
        dur = float(c.get("duration", en - st))
        vs  = float((c.get("scores", {}) or {}).get("viral_score", 0.0))

        clip_out  = CLIPS_DIR  / f"clip_{rank:02d}_{cid}.mp4"
        thumb_out = THUMBS_DIR / f"thumb_{rank:02d}_{cid}.jpg"

        export_clip(VIDEO_PATH, clip_out, st, en)
        _validate_clip_file(clip_out, expected_dur=(en-st))

        # Mirror to public outputs for Kaggle UI convenience
        try:
            PUBLIC_CLIPS_DIR.mkdir(parents=True, exist_ok=True)
            public_clip = PUBLIC_CLIPS_DIR / clip_out.name
            if public_clip.exists():
                public_clip.unlink()
            public_clip.write_bytes(clip_out.read_bytes())
        except Exception as e:
            logger.warning(f"Public clip mirror failed: {e}")

        t_thumb = st + min(0.5, max(0.0, dur * 0.10))
        export_thumb(VIDEO_PATH, thumb_out, t_thumb)
        try:
            PUBLIC_THUMBS_DIR.mkdir(parents=True, exist_ok=True)
            public_thumb = PUBLIC_THUMBS_DIR / thumb_out.name
            if public_thumb.exists():
                public_thumb.unlink()
            public_thumb.write_bytes(thumb_out.read_bytes())
        except Exception as e:
            logger.warning(f"Public thumb mirror failed: {e}")

        er = c.get("editorial_reason", [])
        er = " | ".join(er) if isinstance(er, list) else str(er)

        ranking_rows.append({
            "rank": int(rank),
            "id": cid,
            "start": float(st),
            "end": float(en),
            "duration": float(dur),
            "viral_score": float(vs),
            "clip_path": str(clip_out),
            "thumbnail_path": str(thumb_out),
            "editorial_reason": er,
        })

    ranking_csv = OUT_DIR / "selected_ranking.csv"
    with open(ranking_csv, "w", newline="", encoding="utf-8") as f:
        fieldnames = ["rank","id","start","end","duration","viral_score","clip_path","thumbnail_path","editorial_reason"]
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        w.writerows(ranking_rows)

    logger.info(f"‚úÖ Stage 12 done. Ranking: {ranking_csv}")
    log_flush()


## üßæ Stage 13: Manifest (JSON/CSV) + Caption/Hashtag (Log Only)

In [None]:
# =========================
# Cell: Stage 13 - Caption/Hashtag helpers (LOG ONLY, Non-Core)
# =========================

def _dedupe_keep_order(xs):
    out = []
    for x in xs:
        if x and x not in out:
            out.append(x)
    return out

def build_caption_and_hashtags(clip: Dict[str,Any], transcript_text: str):
    """NON-CORE. Deterministic caption/hashtags berbasis transcript (ASR)."""
    s = clip.get("scores", {}) or {}
    text_raw = (transcript_text or "").strip()
    if not text_raw:
        return "Bagian paling seru di obrolan ini üéØ", ["#shorts","#reels","#tiktok","#podcast"]

    text_lc = text_raw.lower()

    parts = re.split(r'(?<=[\.!\?])\s+|\n+', text_raw)
    if len(parts) == 1:
        parts = re.split(r'[;,]\s+', text_raw)
    parts = [p.strip() for p in parts if p.strip()]
    if not parts:
        parts = [text_raw]

    trigger = set((TRIGGER_WORDS if "TRIGGER_WORDS" in globals() else []) + ["haha","wk"])
    laugh  = set(["wkwk","wkwkwk","ngakak","ketawa","haha","wk"])

    def sent_score(sent: str) -> float:
        lc = sent.lower()
        toks = re.findall(r"[a-z0-9']+", lc)
        trig = sum(1 for t in toks if t in trigger)
        laug = sum(1 for t in toks if t in laugh)
        punct = 2*sent.count("!") + 2*sent.count("?")
        twist = 2 if ("ternyata" in lc or "plot twist" in lc or ("plot" in lc and "twist" in lc)) else 0
        chaos = 1 if any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]) else 0

        L = len(sent)
        len_pen = 0
        if L < 25:  len_pen -= 1
        if L > 110: len_pen -= 1
        return trig*3 + laug*3 + punct + twist*2 + chaos + len_pen

    best_i = max(range(len(parts)), key=lambda i: sent_score(parts[i]))
    best = parts[best_i]

    best_words = re.findall(r"[a-z0-9']+", best.lower())
    if len(best_words) < 4 and len(parts) > 1:
        if best_i > 0:
            best = (parts[best_i-1] + " " + best).strip()
        elif best_i < len(parts)-1:
            best = (best + " " + parts[best_i+1]).strip()

    snippet = re.sub(r"\s+"," ", best).strip()
    if len(snippet) > 92:
        snippet = snippet[:92].rsplit(" ", 1)[0] + "‚Ä¶"

    lc = best.lower()
    if ("ternyata" in lc) or ("plot twist" in lc) or ("plot" in lc and "twist" in lc):
        caption = f"Plot twist: ‚Äú{snippet}‚Äù üëÄ"
    elif any(w in lc for w in ["ngakak","ketawa","wkwk","wkwkwk","haha"]):
        caption = f"Ngakak dulu: ‚Äú{snippet}‚Äù üòÇ"
    elif any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]):
        caption = f"Chaos dikit: ‚Äú{snippet}‚Äù üî•"
    elif ("?" in best) or any(w in lc for w in ["kok","loh","hah"]):
        caption = f"Kok bisa? ‚Äú{snippet}‚Äù ü§Ø"
    else:
        caption = f"‚Äú{snippet}‚Äù üéØ"

    base = ["#shorts", "#reels", "#tiktok", "#podcast"]
    topic_rules = [
        (r"\b(ngakak|ketawa|wkwk|wkwkwk|lucu|komedi|kocak)\b", "#komedi"),
        (r"\b(marah|emosi|ribut|debat|berantem|panas)\b", "#debat"),
        (r"\b(duit|uang|cuan|bisnis|jualan|modal|untung|rugi|investasi|saham|crypto|forex)\b", "#keuangan"),
        (r"\b(politik|pemilu|presiden|menteri|dpr)\b", "#politik"),
        (r"\b(cinta|pacar|mantan|nikah)\b", "#relationship"),
        (r"\b(game|ml|mobile legends|valorant|pubg|ff|free fire)\b", "#gaming"),
        (r"\b(makanan|kuliner|masak|pedas|enak)\b", "#kuliner"),
    ]
    extras = []
    for pat, tag in topic_rules:
        if re.search(pat, text_lc):
            extras.append(tag)

    if re.search(r"\b(anjir|anjay|gila|parah|buset|astaga|waduh|yaampun)\b", text_lc):
        extras.append("#chaos")
    if re.search(r"\b(ternyata|plot|twist)\b", text_lc):
        extras.append("#plottwist")

    vs = float(s.get("viral_score", 0.0))
    if vs >= 80:
        extras.append("#viral")

    tags = _dedupe_keep_order(base + extras)
    return caption, tags[:8]

# =========================
# Cell: Stage 13 - Caption/Hashtag helpers (LOG ONLY, Non-Core)
# =========================

def _dedupe_keep_order(xs):
    out = []
    for x in xs:
        if x and x not in out:
            out.append(x)
    return out

def build_caption_and_hashtags(clip: Dict[str,Any], transcript_text: str):
    """NON-CORE. Deterministic caption/hashtags berbasis transcript (ASR)."""
    s = clip.get("scores", {}) or {}
    text_raw = (transcript_text or "").strip()
    if not text_raw:
        return "Bagian paling seru di obrolan ini üéØ", ["#shorts","#reels","#tiktok","#podcast"]

    text_lc = text_raw.lower()

    parts = re.split(r'(?<=[\.!\?])\s+|\n+', text_raw)
    if len(parts) == 1:
        parts = re.split(r'[;,]\s+', text_raw)
    parts = [p.strip() for p in parts if p.strip()]
    if not parts:
        parts = [text_raw]

    trigger = set((TRIGGER_WORDS if "TRIGGER_WORDS" in globals() else []) + ["haha","wk"])
    laugh  = set(["wkwk","wkwkwk","ngakak","ketawa","haha","wk"])

    def sent_score(sent: str) -> float:
        lc = sent.lower()
        toks = re.findall(r"[a-z0-9']+", lc)
        trig = sum(1 for t in toks if t in trigger)
        laug = sum(1 for t in toks if t in laugh)
        punct = 2*sent.count("!") + 2*sent.count("?")
        twist = 2 if ("ternyata" in lc or "plot twist" in lc or ("plot" in lc and "twist" in lc)) else 0
        chaos = 1 if any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]) else 0

        L = len(sent)
        len_pen = 0
        if L < 25:  len_pen -= 1
        if L > 110: len_pen -= 1
        return trig*3 + laug*3 + punct + twist*2 + chaos + len_pen

    best_i = max(range(len(parts)), key=lambda i: sent_score(parts[i]))
    best = parts[best_i]

    best_words = re.findall(r"[a-z0-9']+", best.lower())
    if len(best_words) < 4 and len(parts) > 1:
        if best_i > 0:
            best = (parts[best_i-1] + " " + best).strip()
        elif best_i < len(parts)-1:
            best = (best + " " + parts[best_i+1]).strip()

    snippet = re.sub(r"\s+"," ", best).strip()
    if len(snippet) > 92:
        snippet = snippet[:92].rsplit(" ", 1)[0] + "‚Ä¶"

    lc = best.lower()
    if ("ternyata" in lc) or ("plot twist" in lc) or ("plot" in lc and "twist" in lc):
        caption = f"Plot twist: ‚Äú{snippet}‚Äù üëÄ"
    elif any(w in lc for w in ["ngakak","ketawa","wkwk","wkwkwk","haha"]):
        caption = f"Ngakak dulu: ‚Äú{snippet}‚Äù üòÇ"
    elif any(w in lc for w in ["anjir","anjay","gila","parah","buset","astaga","waduh","yaampun"]):
        caption = f"Chaos dikit: ‚Äú{snippet}‚Äù üî•"
    elif ("?" in best) or any(w in lc for w in ["kok","loh","hah"]):
        caption = f"Kok bisa? ‚Äú{snippet}‚Äù ü§Ø"
    else:
        caption = f"‚Äú{snippet}‚Äù üéØ"

    base = ["#shorts", "#reels", "#tiktok", "#podcast"]
    topic_rules = [
        (r"\b(ngakak|ketawa|wkwk|wkwkwk|lucu|komedi|kocak)\b", "#komedi"),
        (r"\b(marah|emosi|ribut|debat|berantem|panas)\b", "#debat"),
        (r"\b(duit|uang|cuan|bisnis|jualan|modal|untung|rugi|investasi|saham|crypto|forex)\b", "#keuangan"),
        (r"\b(politik|pemilu|presiden|menteri|dpr)\b", "#politik"),
        (r"\b(cinta|pacar|mantan|nikah)\b", "#relationship"),
        (r"\b(game|ml|mobile legends|valorant|pubg|ff|free fire)\b", "#gaming"),
        (r"\b(makanan|kuliner|masak|pedas|enak)\b", "#kuliner"),
    ]
    extras = []
    for pat, tag in topic_rules:
        if re.search(pat, text_lc):
            extras.append(tag)

    if re.search(r"\b(anjir|anjay|gila|parah|buset|astaga|waduh|yaampun)\b", text_lc):
        extras.append("#chaos")
    if re.search(r"\b(ternyata|plot|twist)\b", text_lc):
        extras.append("#plottwist")

    vs = float(s.get("viral_score", 0.0))
    if vs >= 80:
        extras.append("#viral")

    tags = _dedupe_keep_order(base + extras)
    return caption, tags[:8]

# =========================
# Cell: Stage 13 - Run
# Manifest (locked JSON/CSV) + Caption/Hashtag (LOG ONLY, non-core)
# =========================

with StageTimer(13, "Manifest (locked JSON/CSV) + Caption/Hashtag (log-only)"):
    manifest = {
        "version": "v5.5",
        "video": str(VIDEO_PATH),
        "run_dir": str(RUN_DIR),
        "artifacts_dir": str(ART_DIR),
        "outputs_dir": str(OUT_DIR),
        "clips": []
    }

    for rank, c in enumerate(SELECTED, 1):
        cid = c["id"]
        transcript_text = ""
        if "TRANSCRIPTS" in globals() and isinstance(TRANSCRIPTS, dict):
            transcript_text = (TRANSCRIPTS.get(cid, {}) or {}).get("text", "") or ""

        caption, hashtags = build_caption_and_hashtags(c, transcript_text)

        # attach to selected (for LOG TABLE preview only) ‚Äî idempotent
        if (not c.get("caption")) or (not c.get("hashtags")):
            c["caption"] = caption
            c["hashtags"] = hashtags

        item = {
            "rank": int(rank),
            "id": str(cid),
            "start": float(c["start"]),
            "end": float(c["end"]),
            "duration": float(c["duration"]),
            "viral_score": float((c.get("scores", {}) or {}).get("viral_score", 0.0)),
            "clip_path": str(CLIPS_DIR / f"clip_{rank:02d}_{cid}.mp4"),
            "thumbnail_path": str(THUMBS_DIR / f"thumb_{rank:02d}_{cid}.jpg"),
            "editorial_reason": c.get("editorial_reason", []),
        }
        manifest["clips"].append(item)

        # Write manifests both to run folder (locked) and to /kaggle/working root (compat)
    manifest_json_path = RUN_DIR / "manifest.json"
    manifest_csv_path  = RUN_DIR / "manifest.csv"
    write_json(manifest_json_path, manifest)

    root_manifest_json = WORKDIR / "manifest.json"
    root_manifest_csv  = WORKDIR / "manifest.csv"
    write_json(root_manifest_json, manifest)

    def _write_manifest_csv(path: Path):
        with open(path, "w", newline="", encoding="utf-8") as f:
            fieldnames = [
                "rank","id","start","end","duration","viral_score",
                "clip_path","thumbnail_path","editorial_reason"
            ]
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writeheader()
            for it in manifest["clips"]:
                row = dict(it)
                er = row.get("editorial_reason", [])
                row["editorial_reason"] = " | ".join(er) if isinstance(er, list) else str(er)
                w.writerow(row)

    _write_manifest_csv(manifest_csv_path)
    _write_manifest_csv(root_manifest_csv)

    logger.info(f"Wrote: {manifest_json_path}")
    logger.info(f"Wrote: {manifest_csv_path}")
    logger.info(f"Wrote: {root_manifest_json}")
    logger.info(f"Wrote: {root_manifest_csv}")
    log_flush()


## ‚úÖ Stage 14: Acceptance Tests & Summary

In [None]:
# =========================
# Stage 14: Acceptance Tests & Summary
# =========================

with StageTimer(14, "Acceptance Tests"):

    # -------------------------------------------------
    # 1) Outputs existence
    # -------------------------------------------------
    assert CLIPS_DIR.exists(), "Missing outputs/clips"
    assert THUMBS_DIR.exists(), "Missing outputs/thumbnails"
    assert (RUN_DIR / "manifest.json").exists(), "Missing runs/<run_id>/manifest.json"
    assert (RUN_DIR / "manifest.csv").exists(), "Missing runs/<run_id>/manifest.csv"
    # also keep root manifests for convenience
    assert (WORKDIR / "manifest.json").exists(), "Missing /kaggle/working/manifest.json"
    assert (WORKDIR / "manifest.csv").exists(), "Missing /kaggle/working/manifest.csv"

    clips = sorted(CLIPS_DIR.glob("*.mp4"))
    thumbs = sorted(THUMBS_DIR.glob("*.jpg"))

    assert len(clips) > 0, "No exported clips"
    assert len(thumbs) > 0, "No exported thumbnails"

    # -------------------------------------------------
    # 2) Export format check (first clip)
    # -------------------------------------------------
    probe = json.loads(
        subprocess.check_output([
            FFPROBE_BIN, "-v", "error",
            "-select_streams", "v:0",
            "-show_entries", "stream=width,height,r_frame_rate",
            "-of", "json",
            str(clips[0])
        ]).decode("utf-8")
    )

    st = probe["streams"][0]
    w = int(st["width"])
    h = int(st["height"])

    assert (w, h) == (EXPORT_W, EXPORT_H), (
        f"Bad resolution {w}x{h}, expected {EXPORT_W}x{EXPORT_H}"
    )

    # -------------------------------------------------
    # 3) Ensure no subtitle stream
    # -------------------------------------------------
    probe2 = json.loads(
        subprocess.check_output([
            FFPROBE_BIN, "-v", "error",
            "-show_entries", "stream=codec_type",
            "-of", "json",
            str(clips[0])
        ]).decode("utf-8")
    )

    types = [s["codec_type"] for s in probe2.get("streams", [])]
    assert "subtitle" not in types, "Subtitle stream detected (forbidden)."

# =====================================================
# v5.5 ‚Äî Selection Summary (Audit Log)
# =====================================================
try:
    audit_path = ART_DIR / "selection_audit.json"
    if audit_path.exists():
        audit = read_json(audit_path)
        selected_rows = [a for a in audit if a.get("decision") == "selected"]
        rejected_rows = [a for a in audit if a.get("decision") == "rejected"]

        logger.info("=== SELECTION SUMMARY ===")
        logger.info(f"Candidates total: {len(CANDIDATES)}")

        if CANDIDATES:
            top_score = max(
                c.get("scores", {}).get("viral_score", 0.0)
                for c in CANDIDATES
            )
            logger.info(f"Top viral_score: {top_score:.1f}")

        logger.info(f"Selected: {len(selected_rows)} | Rejected (logged): {len(rejected_rows)}")

        from collections import Counter
        rc = Counter([r.get("reason", "") for r in rejected_rows])

        logger.info("Top reject reasons:")
        for k, v in rc.most_common(8):
            logger.info(f" - {k}: {v}")
    else:
        logger.warning("selection_audit.json not found ‚Äî summary skipped")

except Exception as e:
    logger.warning(f"Selection summary unavailable: {e}")

# -------------------------------------------------
# Final success log
# -------------------------------------------------
logger.info("‚úÖ Acceptance tests passed.")
logger.info(f"Clips: {len(clips)} | Thumbnails: {len(thumbs)}")
logger.info(f"Outputs dir: {OUT_DIR}")
logger.info(f"Run artifacts: {ART_DIR}")


# --- AGENTS.md invariant checks ---
try:
    # novelty constant check
    nov_vals = [float(c.get("scores", {}).get("novelty", 0.0)) for c in CANDIDATES]
    nov_unique = set([round(v, 2) for v in nov_vals])
    if len(nov_unique) <= 1:
        raise RuntimeError("ERROR: novelty constant; possible missing transcripts")

    # selection_audit reason codes
    audit = read_json(ART_DIR / "selection_audit.json") if (ART_DIR / "selection_audit.json").exists() else []
    if not audit or any((not a.get("reason")) for a in audit):
        raise RuntimeError("ERROR: selection_audit missing reason codes")

    # late bucket quota check
    late_bucket = 4
    if (OUT_DIR / "bucket_stats.json").exists():
        bs = read_json(OUT_DIR / "bucket_stats.json")
        b4 = bs.get(str(late_bucket), bs.get(late_bucket, {}))
        if b4.get("meaning_candidates", 0) > 0 and b4.get("selected", 0) < 1:
            raise RuntimeError("ERROR: bucket 80-100% quota missed while meaning candidates exist")

    # copy pipeline log
    try:
        plog = OUT_DIR / "pipeline_log.txt"
        if LOG_FILE.exists():
            plog.write_text(LOG_FILE.read_text(encoding="utf-8"), encoding="utf-8")
    except Exception:
        pass

except Exception as e:
    logger.error(str(e))
    raise
