# Movie Clip Evidence Collector (YouTube)

This notebook builds an **end-to-end** tool to:
- Identify videos from specified YouTube channels
- Collect metadata using the **YouTube Data API v3**
- Preserve evidence artifacts (ToS-safe default):  
  - API JSON (`video.json`)  
  - Watch-page HTML snapshot (`watch.html`)  
  - Thumbnail (`thumbnail.jpg`)  
  - Integrity manifest with SHA-256 hashes (`manifest.json`)  
- Output a formatted **Excel report** listing videos and evidence paths

## Important note
This notebook defaults to **ToS-safe evidence preservation** (metadata + snapshots + hashes) and does **not** download video streams. Full media capture is designed as an **optional plug-in** that should only be enabled when you have explicit authorization or a platform-approved mechanism.


A Google Cloud project with the **YouTube Data API v3** is enabled and an API key is created.



In [3]:
!pip install -q google-api-python-client pandas openpyxl requests pyyaml beautifulsoup4


##Configuration

We store configuration in a simple Python dict.
(Can be switched to a `config.yaml` file later if desired)


In [5]:
CONFIG = {
    "api_key": "YOUR_YOUTUBE_API_KEY",
    "channels": [
    "@ScreenRantPlus",
    "@ScreenRant",
    "@MOVIECLIPS",
    "@JoBloMovieClips",
    "@movieclipstrailers",
    "@rottentomatoestrailers",
    "@WatchMojo",
    "@topmovieclips5056",
    "@FilmIsNow",
    "@KinoCheckInternational",
    "@IGNMovieTrailers",
    "@FandangoMovieclips",
],
    "max_videos_per_channel": 50,
    "output_dir": "evidence",
    "report_dir": "reports",
    "db_path": "state.db",
}


## Imports and Utilities

In [7]:
import os
import re
import json
import time
import hashlib
import sqlite3
import requests
import pandas as pd
from datetime import datetime, timezone

from googleapiclient.discovery import build
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment
from openpyxl.utils import get_column_letter


Hashing & directory helpers

Hashing artifacts to support **integrity** and basic **chain-of-custody**.


In [9]:
def ensure_dirs(*paths: str) -> None:
    for p in paths:
        os.makedirs(p, exist_ok=True)

def sha256_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()

def iso8601_duration_to_seconds(dur: str):
    if not dur or not dur.startswith("PT"):
        return None
    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", dur)
    if not m:
        return None
    hours = int(m.group(1)) if m.group(1) else 0
    minutes = int(m.group(2)) if m.group(2) else 0
    seconds = int(m.group(3)) if m.group(3) else 0
    return hours * 3600 + minutes * 60 + seconds


## SQLite State

store processed videos to:
- avoid duplicates
- support incremental runs
- track last_seen timestamps


In [11]:
def init_db(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute(
        'CREATE TABLE IF NOT EXISTS videos ('
        'video_id TEXT PRIMARY KEY,'
        'channel_id TEXT,'
        'channel_handle TEXT,'
        'title TEXT,'
        'published_at TEXT,'
        'url TEXT,'
        'risk_score REAL,'
        'evidence_dir TEXT,'
        'first_seen_at TEXT,'
        'last_seen_at TEXT'
        ')'
    )
    conn.commit()
    return conn

def upsert_video(conn: sqlite3.Connection, row: tuple) -> None:
    conn.execute(
        'INSERT INTO videos(video_id, channel_id, channel_handle, title, published_at, url, risk_score, evidence_dir, first_seen_at, last_seen_at) '
        'VALUES(?,?,?,?,?,?,?,?,?,?) '
        'ON CONFLICT(video_id) DO UPDATE SET '
        'last_seen_at=excluded.last_seen_at, '
        'risk_score=excluded.risk_score, '
        'evidence_dir=excluded.evidence_dir'
    , row)
    conn.commit()


## Detection Heuristics (Risk Scoring)

- Keywords in title/description (clip, scene, ending, etc.)
- Short-ish durations
- Commercial indicators (sponsor, affiliate, promo)

Later, it can upgrade to perceptual-hash matching against rights-owner reference frames.


In [13]:
import re

KEYWORDS = ["clip", "scene", "ending", "trailer", "deleted", "full scene", "best moments", "1080p", "4k"]

def compute_risk_score(title, description, duration_seconds):
    text = f"{title}\n{description}".lower()

    words = set(re.findall(r"[a-z0-9]+", text))

    score = 0.0
    for k in ["clip", "scene", "ending", "trailer", "deleted", "1080p", "4k"]:
        if k in words:
            score += 2.0

    # phrase keywords
    if "full scene" in text:
        score += 2.5
    if "best moments" in text:
        score += 2.0

    if duration_seconds is not None:
        if 10 <= duration_seconds <= 600:
            score += 3.0
        elif duration_seconds <= 1200:
            score += 1.0

    if any(x in text for x in ["sponsor", "affiliate", "promo"]):
        score += 1.5

    return score


## Resolve channels and list videos

We:
1. Build a YouTube API client using the API key
2. Resolve channel handles (like `@ScreenRantPlus`) into `channelId`
3. List recent videos via the `search.list` endpoint
4. Fetch details via `videos.list`


In [15]:
def build_youtube(api_key: str):
    return build("youtube", "v3", developerKey=api_key)

def resolve_channel_id(youtube, handle_or_query: str):
    req = youtube.search().list(part="snippet", q=handle_or_query, type="channel", maxResults=1)
    resp = req.execute()
    items = resp.get("items", [])
    if not items:
        return None
    return items[0]["snippet"]["channelId"]

def list_channel_videos(youtube, channel_id: str, max_results: int = 50):
    videos = []
    page_token = None
    while True:
        remaining = max_results - len(videos)
        if remaining <= 0:
            break
        req = youtube.search().list(
            part="snippet",
            channelId=channel_id,
            maxResults=min(50, remaining),
            order="date",
            type="video",
            pageToken=page_token
        )
        resp = req.execute()
        for it in resp.get("items", []):
            videos.append(it["id"]["videoId"])
            if len(videos) >= max_results:
                return videos
        page_token = resp.get("nextPageToken")
        if not page_token:
            break
    return videos

def fetch_video_details(youtube, video_ids):
    out = []
    for i in range(0, len(video_ids), 50):
        chunk = video_ids[i:i+50]
        req = youtube.videos().list(
            part="snippet,contentDetails,statistics",
            id=",".join(chunk),
            maxResults=50
        )
        resp = req.execute()
        out.extend(resp.get("items", []))
    return out


## Evidence Preservation (ToS-safe default)

For each video:
- `video.json` (API response)
- `watch.html` (watch-page snapshot)
- `thumbnail.jpg` (best available)
- `manifest.json` (hashes, sizes, timestamps)


In [17]:
def preserve_evidence(video: dict, evidence_dir: str):
    ensure_dirs(evidence_dir)

    captured_at_utc = datetime.now(timezone.utc).isoformat()

    # Save API JSON
    api_json_path = os.path.join(evidence_dir, "video.json")
    with open(api_json_path, "w", encoding="utf-8") as f:
        json.dump(video, f, indent=2, ensure_ascii=False)

    # Save watch page HTML snapshot
    video_id = video["id"]
    url = f"https://www.youtube.com/watch?v={video_id}"
    html_path = os.path.join(evidence_dir, "watch.html")
    r = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0"})
    r.raise_for_status()
    with open(html_path, "w", encoding="utf-8") as f:
        f.write(r.text)

    # Download thumbnail
    thumbs = video.get("snippet", {}).get("thumbnails", {})
    thumb_url = None
    for key in ["maxres", "standard", "high", "medium", "default"]:
        if key in thumbs:
            thumb_url = thumbs[key]["url"]
            break

    thumb_path = None
    if thumb_url:
        thumb_path = os.path.join(evidence_dir, "thumbnail.jpg")
        img = requests.get(thumb_url, timeout=30)
        img.raise_for_status()
        with open(thumb_path, "wb") as f:
            f.write(img.content)

    # Manifest with hashes + timestamps
    manifest = {
        "captured_at_utc": captured_at_utc,
        "video_url": url,
        "artifacts": []
    }
    for p in [api_json_path, html_path, thumb_path]:
        if p and os.path.exists(p):
            manifest["artifacts"].append({
                "path": os.path.basename(p),
                "sha256": sha256_file(p),
                "bytes": os.path.getsize(p)
            })

    manifest_path = os.path.join(evidence_dir, "manifest.json")
    with open(manifest_path, "w", encoding="utf-8") as f:
        json.dump(manifest, f, indent=2, ensure_ascii=False)

    return {
        "video_url": url,
        "evidence_dir": evidence_dir,
        "manifest_path": manifest_path,
        "captured_at_utc": captured_at_utc
    }


## Excel Reporting

- channel handle + channelId
- videoId + title + publish time
- duration + stats
- risk score
- evidence folder + manifest path


In [19]:
def write_excel(report_rows, out_path: str):
    df = pd.DataFrame(report_rows)
    ensure_dirs(os.path.dirname(out_path))
    df.to_excel(out_path, index=False)

    wb = load_workbook(out_path)
    ws = wb.active
    ws.freeze_panes = "A2"

    header_font = Font(bold=True)
    for cell in ws[1]:
        cell.font = header_font
        cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)

    # Auto width (approx)
    for col in range(1, ws.max_column + 1):
        col_letter = get_column_letter(col)
        max_len = 10
        for row in range(1, ws.max_row + 1):
            v = ws.cell(row=row, column=col).value
            if v is not None:
                max_len = max(max_len, min(80, len(str(v))))
        ws.column_dimensions[col_letter].width = max_len + 2

    wb.save(out_path)


## The Pipeline

It will:
- Resolve channels
- Fetch latest videos
- Preserve evidence into `CONFIG["output_dir"]`
- Save report into `CONFIG["report_dir"]`


In [21]:
def to_int(x):
    try:
        return int(x)
    except:
        return None
def run_pipeline(cfg: dict):
    if not cfg.get("api_key") in cfg["api_key"]:
        raise ValueError("Please set CONFIG['api_key'] to your YouTube Data API key.")

    ensure_dirs(cfg["output_dir"], cfg["report_dir"])
    conn = init_db(cfg["db_path"])
    youtube = build_youtube(cfg["api_key"])

    report = []
    now_utc = datetime.now(timezone.utc).isoformat()

    for handle in cfg["channels"]:
        channel_id = resolve_channel_id(youtube, handle)
        if not channel_id:
            print(f"[WARN] Could not resolve channel: {handle}")
            continue

        video_ids = list_channel_videos(youtube, channel_id, cfg.get("max_videos_per_channel", 50))
        if not video_ids:
            print(f"[INFO] No videos found for {handle}")
            continue

        videos = fetch_video_details(youtube, video_ids)

        for v in videos:
            sn = v.get("snippet", {})
            cd = v.get("contentDetails", {})
            st = v.get("statistics", {})

            view_count = to_int(st.get("viewCount"))
            like_count = to_int(st.get("likeCount"))
            comment_count = to_int(st.get("commentCount"))

            dur_s = iso8601_duration_to_seconds(cd.get("duration"))
            score = compute_risk_score(sn.get("title", ""), sn.get("description", ""), dur_s)

            vid = v["id"]
            evidence_dir = os.path.join(cfg["output_dir"], f"channel_{channel_id}", vid)
            ev = preserve_evidence(v, evidence_dir)

            url = ev["video_url"]
            row = (
                vid, channel_id, handle, sn.get("title",""),
                sn.get("publishedAt",""), url, score,
                evidence_dir, now_utc, now_utc
            )
            upsert_video(conn, row)

            report.append({
                "channel_handle": handle,
                "channel_id": channel_id,
                "video_id": vid,
                "title": sn.get("title",""),
                "published_at": sn.get("publishedAt",""),
                "duration_seconds": dur_s,
                "view_count": view_count,
                "like_count": like_count,
                "comment_count": comment_count,
                "risk_score": score,
                "video_url": url,
                "evidence_dir": evidence_dir,
                "manifest_path": ev["manifest_path"],
                "captured_at_utc": ev["captured_at_utc"]

            })


            time.sleep(0.1)

    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    out_xlsx = os.path.join(cfg["report_dir"], f"report_{ts}.xlsx")
    write_excel(report, out_xlsx)

    print(f"[OK] Videos processed: {len(report)}")
    print(f"[OK] Report written: {out_xlsx}")
    print(f"[OK] Evidence root: {cfg['output_dir']}")

    return out_xlsx, cfg["output_dir"]

out_xlsx, evidence_root = run_pipeline(CONFIG)


[OK] Videos processed: 284
[OK] Report written: reports/report_20260202_202530.xlsx
[OK] Evidence root: evidence
