# AI Silence Cutter (Notebook)

This notebook bundles the whole project in one place. Run each cell top to bottom.

Requirements:
- `pip install google-genai`
- ffmpeg on PATH if you want to render edited video

API key:
- Set `GEMINI_API_KEY` env var, or fill in the variable in the next cell.


## Problem Definition & Objective
This project addresses a common editing task: removing long or awkward pauses from spoken video while keeping pauses that add meaning. The objective is to automate a first-pass trim using only the transcript timestamps, then apply a small LLM decision step to avoid cutting meaningful pauses.

Inputs:
- A video file (for optional rendering).
- A transcript with timestamps (SRT, VTT, or simple start/end lines).

Outputs:
- A cut plan (JSON) describing all candidate gaps and decisions.
- A CSV of keep segments.
- An optional edited video created from those keep segments.


## Selected project track
Applied AI tooling for media editing. The system uses deterministic time-gap rules for candidate detection and a compact LLM prompt for subjective decisions. This keeps the workflow explainable, easy to test, and fast enough for real editing tasks.


## Clear problem statement
Given a video and a timestamped transcript, detect pauses between captions, decide whether each pause should be CUT or KEEP based on nearby transcript context, and produce an edit plan and edited output.

Success criteria:
- Gaps above a configurable threshold are evaluated.
- Decisions are returned in strict JSON for traceability.
- Keep segments are computed and can be rendered into a final video.


## Real-world relevance and motivation
Editors of lectures, tutorials, interviews, and internal training content often spend time trimming dead air. This is repetitive work that slows turnaround. Automating the first pass reduces manual effort while still allowing a human to review decisions, saving time without removing editorial control.


## Data Understanding & Preparation
The transcript is the primary data source. Supported formats:
- SRT: numbered blocks with HH:MM:SS,ms timestamps.
- VTT: similar to SRT with HH:MM:SS.ms timestamps.
- Plain: `start end text` per line.

Preparation steps:
1) Parse timestamps into seconds.
2) Extract caption entries `{start_sec, end_sec, text}`.
3) Sort entries in time order.
4) Compute gaps between consecutive captions.
5) Build context windows of N captions before and after each gap.


## Model / System Design
Hybrid pipeline with two stages:

1) Deterministic gap detection
- gap = next.start_sec - current.end_sec
- if gap >= MIN_GAP_TO_CONSIDER, mark as candidate

2) LLM decision step (Gemini)
- Provide gap duration and nearby transcript context.
- Require strict JSON output with decision and reason.
- Validate JSON and retry on failure.
- Batch up to 10 candidates per request.

This keeps the system controllable (via thresholds and context size) while leveraging the LLM for subjective judgments.


## Core Implementation
Key components implemented in the notebook and project:

- Transcript parsing: detect format, normalize timestamps, and extract caption text.
- Gap detection: build candidate gaps with context_before and context_after.
- Gemini wrapper: call `gemini-3-flash-preview`, parse and validate JSON.
- Cut plan: remove CUT gaps and merge keep segments.
- Rendering: optional ffmpeg trim/concat for final video output.

Configurable filters:
- `min_gap`: smallest pause to evaluate.
- `context`: number of captions before and after each gap.
- `batch_size`: number of candidates per Gemini request.


## Evaluation & Analysis
Evaluation focuses on practical edit quality:

- Quantitative metrics: number of gaps found, CUT vs KEEP counts, and duration reduction.
- Qualitative review: spot-check edited segments for pacing and meaning.

Typical adjustments:
- Lower min_gap for more aggressive trimming.
- Increase context to reduce over-cutting meaningful pauses.
- Merge short keep segments to avoid awkward micro-clips.


## Ethical Considerations & Responsible AI
- Privacy: only transcript snippets are sent to Gemini; video stays local.
- Consent: avoid uploading private or sensitive transcripts without permission.
- Transparency: decisions and reasons are logged in JSON for review.
- Human oversight: treat edits as a first pass and confirm context-sensitive cuts.


## Conclusion & Future Scope
This notebook demonstrates an end-to-end silence cutter that combines deterministic timing rules with LLM context judgments. It produces a clear audit trail and optional rendered output.

Future improvements:
- Combine transcript gaps with audio-level silence detection.
- Add word-level timestamps for finer trimming.
- Provide editor exports (EDL/XML) for NLEs.
- Add a review UI to override decisions before rendering.


In [None]:
import os
import re
import json
import csv
import time
import shutil
import subprocess
from typing import List, Dict, Tuple

# Optional: set this if you do not want to use environment variables.
GEMINI_API_KEY = ""

def load_api_key():
    key = os.environ.get("GEMINI_API_KEY", "").strip()
    if not key:
        key = GEMINI_API_KEY.strip()
    if not key:
        raise RuntimeError("Missing GEMINI_API_KEY. Set env var or fill GEMINI_API_KEY variable.")
    return key


## Transcript parsing (SRT/VTT/plain)


In [None]:
TIME_LINE_RE = re.compile(
    r"(?P<start>\d{2}:\d{2}:\d{2}[.,]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[.,]\d{3})"
)


def parse_timestamp(ts: str) -> float:
    ts = ts.replace(",", ".")
    parts = ts.split(":")
    if len(parts) != 3:
        raise ValueError(f"Invalid timestamp: {ts}")
    hours = int(parts[0])
    minutes = int(parts[1])
    seconds = float(parts[2])
    return hours * 3600 + minutes * 60 + seconds


def parse_time_token(token: str) -> float:
    token = token.strip()
    if ":" in token:
        return parse_timestamp(token)
    return float(token)


def detect_format(lines: List[str]) -> str:
    for line in lines:
        if line.strip().upper() == "WEBVTT":
            return "vtt"
    for line in lines:
        if TIME_LINE_RE.search(line):
            return "srt_vtt"
    return "plain"


def parse_srt_vtt(lines: List[str]) -> List[Dict[str, object]]:
    entries = []
    i = 0
    total = len(lines)
    while i < total:
        line = lines[i].strip()
        if not line:
            i += 1
            continue
        if line.isdigit() and i + 1 < total and "-->" in lines[i + 1]:
            i += 1
            line = lines[i].strip()
        match = TIME_LINE_RE.match(line)
        if match:
            start = parse_timestamp(match.group("start"))
            end = parse_timestamp(match.group("end"))
            i += 1
            text_lines = []
            while i < total and lines[i].strip():
                text_lines.append(lines[i].strip())
                i += 1
            text = " ".join(text_lines).strip()
            entries.append({"start_sec": start, "end_sec": end, "text": text})
        else:
            i += 1
    return entries


def parse_plain(lines: List[str]) -> List[Dict[str, object]]:
    entries = []
    for line in lines:
        stripped = line.strip()
        if not stripped:
            continue
        parts = stripped.split()
        if len(parts) < 3:
            continue
        try:
            start = parse_time_token(parts[0])
            end = parse_time_token(parts[1])
        except ValueError:
            continue
        text = " ".join(parts[2:]).strip()
        entries.append({"start_sec": start, "end_sec": end, "text": text})
    return entries


def parse_transcript(path: str) -> List[Dict[str, object]]:
    with open(path, "r", encoding="utf-8") as handle:
        content = handle.read()
    lines = content.splitlines()
    fmt = detect_format(lines)
    if fmt in ("vtt", "srt_vtt"):
        entries = parse_srt_vtt(lines)
    else:
        entries = parse_plain(lines)
    entries.sort(key=lambda item: (item["start_sec"], item["end_sec"]))
    return entries


## Gap detection


In [None]:
def detect_gaps(captions: List[Dict[str, object]], min_gap: float = 0.8, context: int = 2):
    candidates = []
    if not captions:
        return candidates
    for i in range(len(captions) - 1):
        current = captions[i]
        next_cap = captions[i + 1]
        gap = float(next_cap["start_sec"]) - float(current["end_sec"])
        if gap >= min_gap:
            before_start = max(0, i - context + 1)
            after_end = min(len(captions), i + 1 + context)
            context_before = captions[before_start : i + 1]
            context_after = captions[i + 1 : after_end]
            candidates.append(
                {
                    "id": f"gap_{i}",
                    "gap_start": float(current["end_sec"]),
                    "gap_end": float(next_cap["start_sec"]),
                    "gap_duration": gap,
                    "context_before": context_before,
                    "context_after": context_after,
                }
            )
    return candidates


## Gemini decision helper


In [None]:
from google import genai


def gemini_generate_text(prompt: str) -> str:
    key = load_api_key()
    if not os.environ.get("GEMINI_API_KEY"):
        os.environ["GEMINI_API_KEY"] = key
    client = genai.Client()
    response = client.models.generate_content(
        model="gemini-3-flash-preview", contents=prompt
    )
    return response.text or ""


def _format_context(items):
    lines = []
    for item in items:
        start = f"{float(item['start_sec']):.3f}"
        end = f"{float(item['end_sec']):.3f}"
        text = str(item["text"]).replace("
", " ").strip()
        lines.append(f"{start}-{end} {text}")
    return lines


def _build_prompt(candidates):
    lines = []
    lines.append(
        "Decide whether to CUT or KEEP each pause in a lecture video. "
        "Keep pauses that add meaning (emphasis, transition, reflection). "
        "Cut filler silence."
    )
    lines.append(
        'Respond with JSON only: [{"id":"...","decision":"CUT|KEEP","reason":"short"}]'
    )
    lines.append("Candidates:")
    for cand in candidates:
        lines.append(f"ID: {cand['id']}")
        lines.append(
            f"gap_start: {cand['gap_start']:.3f}, gap_end: {cand['gap_end']:.3f}, "
            f"gap_duration: {cand['gap_duration']:.3f}"
        )
        before_lines = _format_context(cand["context_before"])
        after_lines = _format_context(cand["context_after"])
        lines.append("context_before:")
        lines.extend(before_lines or ["(none)"])
        lines.append("context_after:")
        lines.extend(after_lines or ["(none)"])
    return "
".join(lines)


def _extract_json(text: str):
    text = text.strip()
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass
    match = re.search(r"(\[.*\]|\{.*\})", text, re.S)
    if match:
        try:
            return json.loads(match.group(1))
        except json.JSONDecodeError:
            return None
    return None


def _validate_response(payload, expected_ids):
    if not isinstance(payload, list):
        return None
    by_id = {}
    for item in payload:
        if not isinstance(item, dict):
            continue
        gap_id = item.get("id")
        decision = item.get("decision")
        reason = item.get("reason", "")
        if gap_id in expected_ids and decision in ("CUT", "KEEP"):
            by_id[gap_id] = {"id": gap_id, "decision": decision, "reason": str(reason)}
    if set(by_id.keys()) != set(expected_ids):
        return None
    return list(by_id.values())


def decide_gaps(candidates, batch_size: int = 10, max_retries: int = 2):
    results = []
    for start in range(0, len(candidates), batch_size):
        batch = candidates[start : start + batch_size]
        expected_ids = [item["id"] for item in batch]
        prompt = _build_prompt(batch)
        attempt = 0
        batch_result = None
        while attempt <= max_retries:
            response_text = gemini_generate_text(prompt)
            payload = _extract_json(response_text)
            batch_result = _validate_response(payload, expected_ids)
            if batch_result is not None:
                break
            attempt += 1
            time.sleep(0.5)
        if batch_result is None:
            raise RuntimeError("Gemini returned invalid JSON after retries.")
        results.extend(batch_result)
    return results


## Cut plan (keep segments)


In [None]:
def _merge_by_gap(segments: List[List[float]], gap_threshold: float):
    if not segments:
        return []
    merged = [segments[0][:]]
    for start, end in segments[1:]:
        if start - merged[-1][1] <= gap_threshold:
            merged[-1][1] = max(merged[-1][1], end)
        else:
            merged.append([start, end])
    return merged


def _enforce_min_length(segments: List[List[float]], min_len: float):
    if not segments:
        return []
    i = 0
    while i < len(segments):
        start, end = segments[i]
        if end - start >= min_len or len(segments) == 1:
            i += 1
            continue
        if i == 0 and len(segments) > 1:
            segments[1][0] = start
            segments.pop(0)
            continue
        if i > 0:
            segments[i - 1][1] = end
            segments.pop(i)
            continue
        i += 1
    return segments


def compute_keep_segments(captions, candidates, decisions, merge_gap: float = 0.1, min_keep: float = 0.25):
    if captions:
        total_duration = max(float(item["end_sec"]) for item in captions)
    else:
        total_duration = 0.0

    decision_map = {item["id"]: item for item in decisions}
    segments = []
    cursor = 0.0

    for cand in candidates:
        decision = decision_map.get(cand["id"], {}).get("decision", "KEEP")
        if decision == "CUT":
            if cand["gap_start"] > cursor:
                segments.append([cursor, cand["gap_start"]])
            cursor = max(cursor, cand["gap_end"])

    if total_duration > cursor:
        segments.append([cursor, total_duration])

    segments = _merge_by_gap(segments, merge_gap)
    segments = _enforce_min_length(segments, min_keep)

    return segments, total_duration


## Render edited video with ffmpeg (optional)


In [None]:
def _has_audio(input_path: str) -> bool:
    ffprobe = shutil.which("ffprobe")
    if not ffprobe:
        return False
    command = [
        ffprobe,
        "-v",
        "error",
        "-select_streams",
        "a",
        "-show_entries",
        "stream=index",
        "-of",
        "csv=p=0",
        input_path,
    ]
    result = subprocess.run(command, capture_output=True, text=True, check=False)
    return bool(result.stdout.strip())


def render_video(input_path: str, segments: List[List[float]], output_path: str):
    ffmpeg = shutil.which("ffmpeg")
    if not ffmpeg:
        raise RuntimeError("ffmpeg not found in PATH.")
    if not segments:
        raise ValueError("No segments to render.")

    has_audio = _has_audio(input_path)
    filter_parts = []
    concat_inputs = []

    for idx, (start, end) in enumerate(segments):
        v_label = f"v{idx}"
        filter_parts.append(
            f"[0:v]trim=start={start}:end={end},setpts=PTS-STARTPTS[{v_label}]"
        )
        concat_inputs.append(f"[{v_label}]")
        if has_audio:
            a_label = f"a{idx}"
            filter_parts.append(
                f"[0:a]atrim=start={start}:end={end},asetpts=PTS-STARTPTS[{a_label}]"
            )
            concat_inputs.append(f"[{a_label}]")

    if has_audio:
        concat_filter = "".join(concat_inputs) + f"concat=n={len(segments)}:v=1:a=1[v][a]"
        filter_parts.append(concat_filter)
        command = [
            ffmpeg,
            "-y",
            "-i",
            input_path,
            "-filter_complex",
            ";".join(filter_parts),
            "-map",
            "[v]",
            "-map",
            "[a]",
            "-movflags",
            "+faststart",
            output_path,
        ]
    else:
        concat_filter = "".join(concat_inputs) + f"concat=n={len(segments)}:v=1:a=0[v]"
        filter_parts.append(concat_filter)
        command = [
            ffmpeg,
            "-y",
            "-i",
            input_path,
            "-filter_complex",
            ";".join(filter_parts),
            "-map",
            "[v]",
            "-movflags",
            "+faststart",
            output_path,
        ]

    subprocess.run(command, check=True)


## End-to-end run helper


In [None]:
def run_silence_cutter(video_path: str, transcript_path: str, outdir: str = "outputs", min_gap: float = 0.8, context: int = 2, batch_size: int = 10):
    os.makedirs(outdir, exist_ok=True)
    captions = parse_transcript(transcript_path)
    candidates = detect_gaps(captions, min_gap=min_gap, context=context)

    decisions = []
    if candidates:
        decisions = decide_gaps(candidates, batch_size=batch_size, max_retries=2)

    decisions_by_id = {item["id"]: item for item in decisions}
    for cand in candidates:
        decision = decisions_by_id.get(cand["id"], {"decision": "KEEP", "reason": ""})
        cand["decision"] = decision["decision"]
        cand["reason"] = decision.get("reason", "")

    keep_segments, total_duration = compute_keep_segments(captions, candidates, decisions)
    estimated_duration = sum(end - start for start, end in keep_segments)

    cut_plan = {
        "video": video_path,
        "transcript": transcript_path,
        "min_gap": min_gap,
        "context": context,
        "total_duration_sec": round(total_duration, 3),
        "estimated_edited_duration_sec": round(estimated_duration, 3),
        "candidates": candidates,
        "keep_segments": [
            {
                "start_sec": round(start, 3),
                "end_sec": round(end, 3),
                "duration_sec": round(end - start, 3),
            }
            for start, end in keep_segments
        ],
    }

    cut_plan_path = os.path.join(outdir, "cut_plan.json")
    keep_csv_path = os.path.join(outdir, "keep_segments.csv")
    with open(cut_plan_path, "w", encoding="utf-8") as handle:
        json.dump(cut_plan, handle, indent=2)
    with open(keep_csv_path, "w", encoding="utf-8", newline="") as handle:
        writer = csv.writer(handle)
        writer.writerow(["start_sec", "end_sec", "duration_sec"])
        for start, end in keep_segments:
            writer.writerow([f"{start:.3f}", f"{end:.3f}", f"{(end - start):.3f}"])

    edited_path = os.path.join(outdir, "edited.mp4")
    render_error = None
    if keep_segments:
        try:
            render_video(video_path, keep_segments, edited_path)
        except Exception as exc:
            render_error = str(exc)

    summary = {
        "gaps_found": len(candidates),
        "cut_count": sum(1 for c in candidates if c.get("decision") == "CUT"),
        "keep_count": sum(1 for c in candidates if c.get("decision") == "KEEP"),
        "total_duration_sec": round(total_duration, 2),
        "estimated_duration_sec": round(estimated_duration, 2),
        "edited_path": edited_path if os.path.isfile(edited_path) else None,
        "render_error": render_error,
    }

    return summary


## Example usage


In [None]:
# Example:
# summary = run_silence_cutter("path/to/video.mp4", "path/to/transcript.srt", outdir="outputs")
# summary
