<a href="https://colab.research.google.com/github/zack-dev-cm/trendwatch/blob/main/trendwatch_yt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%writefile trendwatch_shorts_pipeline.py
"""Trend‑Watching for Viral YouTube Shorts (robust v2)
=====================================================
A fully‑worked example that:
* Queries the YouTube Data API v3 for Shorts published in the last *N* days
* Downloads captions – or falls back to OCR on a handful of frames, or to Whisper audio transcription
* Extracts key metadata & simple virality metrics (views/day, like ratio)
* Performs lightweight topic/hook analysis with OpenAI’s GPT‑4o‑mini
* Saves a CSV ready for downstream MCP indexing (optional FastMCP server included)

Key improvements over v1
-----------------------
* **Reliable media download** – uses `pytube` with graceful fall‑backs (age‑gate, 403s, missing streams)
* **Modular fall‑back logic** – caption → frame‑OCR → Whisper audio, trying the cheapest first
* **Progress visibility** – rich printouts + `tqdm` on API loops, per‑step success/fail logs
* **Extra metrics** – `duration_sec`, `views_per_day`, `like_ratio`, `elapsed_days`, `virality_score`
* **Configurable** – all knobs at the top; safe defaults; env‑var key loading
* **Single‑file runnable** – python 3.10+, no notebook cells required

Usage
-----
```bash
!pip install --upgrade google-api-python-client youtube_transcript_api pytube moviepy fastmcp pillow openai pandas tqdm python-dotenv rich whisper @adzvire/pytubefix
python trendwatch_shorts_pipeline.py --query "AI tools" --days 7 --out out.csv
```
(The `pytubefix` wheel is an actively maintained fork that patches YouTube’s rolling cipher changes.)


from google.colab import userdata
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
YOUTUBE_API_KEY = userdata.get('YOUTUBE_API_KEY')

!python trendwatch_shorts_pipeline.py --query "AI tools" --days 7 --out out.csv --openai_key $OPENAI_API_KEY --yt_key $YOUTUBE_API_KEY
"""

from __future__ import annotations

import argparse
import base64
import datetime as dt
import html
import io
import json
import os
import pathlib
import sys
import tempfile
import textwrap
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Optional

import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from PIL import Image
from pytube import YouTube
from rich.console import Console
from rich.progress import Progress
from tqdm import tqdm
from youtube_transcript_api import YouTubeTranscriptApi

# Optional: a more resilient fork that keeps up with YouTube cipher changes
try:
    from pytubefix import YouTube as YTFix  # type: ignore
except ImportError:
    YTFix = None  # Fallback to stock pytube

from openai import OpenAI
from moviepy import VideoFileClip
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
DEFAULT_QUERY = "YouTube Shorts"
DEFAULT_DAYS_BACK = 10
DEFAULT_MAX_RESULTS = 50
MIN_VIEWS = 100_000
LIKE_RATIO_THRESHOLD = 0.9
FRAME_SAMPLES = 3 # limit o3 spendings
VISION_MODEL = "o3"
TEXT_MODEL = "gpt-4o-mini"

console = Console()


@dataclass
class Keys:
    youtube: str
    openai: str


# ---------------------------------------------------------------------------
# UTILITIES
# ---------------------------------------------------------------------------

def load_keys() -> Keys:
    """Load API keys from env or .env"""
    try:
        # from dotenv import load_dotenv
        from google.colab import userdata
        # load_dotenv()
        yt = userdata.get("YOUTUBE_API_KEY")
        oa = userdata.get("OPENAI_API_KEY")
    except:
        from dotenv import load_dotenv
        load_dotenv()
        yt = os.getenv("YOUTUBE_API_KEY")
        oa = os.getenv("OPENAI_API_KEY")
    if not yt or not oa:
        console.print("[bold red]❌ Missing API keys – set YOUTUBE_API_KEY & OPENAI_API_KEY[\n]")
        sys.exit(1)
    return Keys(yt, oa)


# ---------------------------------------------------------------------------
# YOUTUBE HELPERS
# ---------------------------------------------------------------------------

def yt_service(y_key: str):
    return build("youtube", "v3", developerKey=y_key)


def search_shorts(yt, q: str, days_back: int, max_items: int) -> List[str]:
    """Return a list of video IDs for Shorts (<60 s) sorted by viewCount."""
    published_after = (
        dt.datetime.utcnow() - dt.timedelta(days=days_back)
    ).isoformat("T") + "Z"
    vids: List[str] = []
    next_tok = None
    while len(vids) < max_items:
        resp = (
            yt.search()
            .list(
                q=q,
                type="video",
                videoDuration="short",
                part="id",
                maxResults=min(50, max_items - len(vids)),
                publishedAfter=published_after,
                order="viewCount",
                pageToken=next_tok,
            )
            .execute()
        )
        vids += [i["id"]["videoId"] for i in resp["items"]]
        next_tok = resp.get("nextPageToken")
        if not next_tok:
            break
    return vids[:max_items]


def fetch_details(yt, ids: List[str]) -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []
    for chunk in [ids[i : i + 50] for i in range(0, len(ids), 50)]:
        data = (
            yt.videos()
            .list(id=",".join(chunk), part="snippet,statistics,contentDetails")
            .execute()
        )
        for item in data["items"]:
            stats = item.get("statistics", {})
            snip = item["snippet"]
            dur_iso = item["contentDetails"]["duration"]  # e.g., PT58S
            duration_sec = iso8601_duration_to_seconds(dur_iso)
            publish_dt = dt.datetime.fromisoformat(snip["publishedAt"].replace("Z", "+00:00"))
            now_utc      = dt.datetime.now(dt.timezone.utc)
            elapsed_days = (now_utc - publish_dt).days or 1
            rows.append(
                {
                    "video_id": item["id"],
                    "title": snip["title"],
                    "description": snip.get("description", ""),
                    "publish_dt": publish_dt.isoformat(),
                    "channel": snip.get("channelTitle", ""),
                    "views": int(stats.get("viewCount", 0)),
                    "likes": int(stats.get("likeCount", 0)),
                    "comments": int(stats.get("commentCount", 0)),
                    "duration_sec": duration_sec,
                    "elapsed_days": elapsed_days,
                    "views_per_day": int(stats.get("viewCount", 0)) / elapsed_days,
                }
            )
    df = pd.DataFrame(rows)
    df["like_ratio"] = df["likes"] / (df["likes"] + 1e-6)
    return df


def iso8601_duration_to_seconds(d: str) -> int:
    """Convert ISO 8601 duration string (e.g., PT58S) to seconds."""
    import re

    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", d)
    if not m:
        return 0
    h, m_, s = (int(x) if x else 0 for x in m.groups())
    return h * 3600 + m_ * 60 + s


# ---------------------------------------------------------------------------
# CAPTIONS / OCR / WHISPER PIPELINE
# ---------------------------------------------------------------------------

def try_captions(video_id: str) -> Optional[str]:
    """Try YouTube transcripts first (no quota)."""
    try:
        tr = YouTubeTranscriptApi.get_transcript(
            video_id, languages=["en", "en-US", "en-GB"]
        )
        return "\n".join(c["text"] for c in tr)
    except Exception:
        return None


def try_api_captions(yt, video_id: str) -> Optional[str]:
    try:
        caps = yt.captions().list(videoId=video_id, part="id").execute()
        if not caps["items"]:
            return None
        track_id = caps["items"][0]["id"]
        body = yt.captions().download(id=track_id, tfmt="srt").execute()["body"]
        return body
    except HttpError:
        return None


def sample_frames(video_url: str, n: int = FRAME_SAMPLES) -> List[Image.Image]:
    """Download video (progressive mp4 if possible) and return n evenly‑spaced frames."""
    try:
        yt_obj = (YTFix or YouTube)(video_url)
        stream = yt_obj.streams.filter(progressive=True, file_extension="mp4").first()
        if not stream:
            console.print(f"[yellow]⚠️ No progressive stream for {video_url}")
            return []
        tmp_path = stream.download(output_path=tempfile.gettempdir(), skip_existing=True)
        clip = VideoFileClip(tmp_path)
        dur = clip.duration
        frames = [Image.fromarray(clip.get_frame(dur * (i + 1) / (n + 1))) for i in range(n)]
        clip.close()
        pathlib.Path(tmp_path).unlink(missing_ok=True)
        return frames
    except Exception as e:
        console.print(f"[yellow]⚠️ Frame sampling failed for {video_url}: {e}")
        return []


def ocr_frames(client: OpenAI, frames: List[Image.Image]) -> str:
    texts: List[str] = []
    for idx, img in enumerate(frames):
        buf = io.BytesIO()
        img.save(buf, format="PNG")
        b64 = base64.b64encode(buf.getvalue()).decode()
        resp = client.chat.completions.create(
            model=VISION_MODEL,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{b64}"},
                        },
                        {
                            "type": "text",
                            "text": "Extract all visible text and a short scene description (≤40 words).",
                        },
                    ],
                }
            ],
            # max_completion_tokens=120,
        )
        texts.append(resp.choices[0].message.content.strip())
    return "\n".join(texts)


# ---------------------------------------------------------------------------
# NLP ANALYSIS
# ---------------------------------------------------------------------------

def analyze_text(client: OpenAI, text: str) -> Dict[str, str]:
    prompt = (
        "Read the captions & description below. Return two XML tags only:\n"
        "<topic> – main subject in ≤5 words\n"
        "<hooks> – concise list of virality hooks (≤40 chars each, ';'‑separated)\n\n"
        "TEXT:\n" + text
    )
    resp = client.chat.completions.create(
        model=TEXT_MODEL,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=120,
    )
    out = resp.choices[0].message.content
    import re

    topic = re.search(r"<topic>(.*?)</topic>", out, re.S)
    hooks = re.search(r"<hooks>(.*?)</hooks>", out, re.S)
    return {
        "topic": html.unescape(topic.group(1).strip()) if topic else "",
        "hooks": html.unescape(hooks.group(1).strip()) if hooks else "",
    }


# ---------------------------------------------------------------------------
# PIPELINE
# ---------------------------------------------------------------------------

def process_video(
    client: OpenAI,
    yt,
    vid: str,
    frame_samples: int = FRAME_SAMPLES,
) -> Dict[str, str]:
    """Return captions (from whichever source) + NLP analysis dict."""
    caption = try_captions(vid) or try_api_captions(yt, vid)
    if not caption:
        frames = sample_frames(f"https://www.youtube.com/watch?v={vid}", frame_samples)
        if frames:
            caption = ocr_frames(client, frames)
    if not caption:
        caption = ""
    analysis = analyze_text(client, caption)
    return {"captions": caption, **analysis}


def virality_score(row: pd.Series) -> float:
    return row.views / 1_000 + row.likes + row.views_per_day * 0.1


def run_pipeline(OPENAI_API_KEY, YOUTUBE_API_KEY,
                 query=DEFAULT_QUERY, days_back=DEFAULT_DAYS_BACK,
                 max_results=DEFAULT_MAX_RESULTS, out_csv="trendwatch_results.csv"):

    # keys = load_keys()
    client = OpenAI(api_key=OPENAI_API_KEY)
    yt = yt_service(YOUTUBE_API_KEY)

    console.print(f"[bold cyan]🔍 Searching for shorts: '{query}' (last {days_back} days)…")
    vids = search_shorts(yt, query, days_back, max_results)
    console.print(f"Found {len(vids)} potential shorts – fetching details…")
    details = fetch_details(yt, vids)
    console.print("Filtering by virality thresholds…")
    df = details[(details.views >= MIN_VIEWS) & (details.like_ratio >= LIKE_RATIO_THRESHOLD)].reset_index(drop=True)
    console.print(f"[green]✔ {len(df)} shorts pass the filter")

    captions = []
    topics = []
    hooks = []

    with Progress() as progress:
        task = progress.add_task("Analyzing", total=len(df))
        for row in df.itertuples():
            pdata = process_video(client, yt, row.video_id)
            captions.append(pdata["captions"])
            topics.append(pdata["topic"])
            hooks.append(pdata["hooks"])
            progress.advance(task)

    df["captions"] = captions
    df["topic"] = topics
    df["catchy_factors"] = hooks
    df["virality_score"] = df.apply(virality_score, axis=1)

    df.to_csv(out_csv, index=False)
    console.print(f"[bold green]✅ Saved results to {out_csv} ({len(df)} rows)")
    return df


# ---------------------------------------------------------------------------
# MCP SERVER (OPTIONAL)
# ---------------------------------------------------------------------------

def start_mcp(df: pd.DataFrame, host: str = "0.0.0.0", port: int = 8000):
    from fastmcp import FastMCP

    mcp = FastMCP(
        name="YouTubeShortsTrendwatch",
        instructions="Trending YouTube Shorts corpus for deep research",
    )

    @mcp.tool()
    async def search(query: str) -> Dict[str, List[Dict[str, Any]]]:
        sub = df[
            df.title.str.contains(query, case=False, na=False)
            | df.description.str.contains(query, case=False, na=False)
        ]
        return {
            "results": [
                {
                    "id": r.video_id,
                    "title": r.title,
                    "text": textwrap.shorten(r.description, 140),
                    "url": f"https://www.youtube.com/watch?v={r.video_id}",
                }
                for r in sub.itertuples()
            ]
        }

    @mcp.tool()
    async def fetch(id: str) -> Dict[str, Any]:
        r = df[df.video_id == id]
        if r.empty:
            raise ValueError("id not found")
        r = r.iloc[0]
        return {
            "id": id,
            "title": r.title,
            "text": f"{r.description}\n\nCaptions:\n{r.captions}",
            "url": f"https://www.youtube.com/watch?v={id}",
            "metadata": {
                "publish_dt": r.publish_dt,
                "views": int(r.views),
                "likes": int(r.likes),
                "virality_score": float(r.virality_score),
                "topic": r.topic,
                "catchy": r.catchy_factors,
            },
        }

    console.print(f"[cyan]🚀 Starting MCP server on {host}:{port} (SSE)…")
    mcp.run(transport="sse", host=host, port=port)


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def cli():
    p = argparse.ArgumentParser(description="Trend‑watch YouTube Shorts")
    p.add_argument("--query", default=DEFAULT_QUERY, help="Search query (default: 'YouTube Shorts')")
    p.add_argument("--days", type=int, default=DEFAULT_DAYS_BACK, help="Published within last N days")
    p.add_argument("--max", type=int, default=DEFAULT_MAX_RESULTS, help="Max shorts to fetch before filter")
    p.add_argument("--out", default="trendwatch_results.csv", help="Output CSV path")
    p.add_argument("--mcp", action="store_true", help="Launch an MCP server after collecting data")
    p.add_argument("--openai_key", default=os.getenv("OPENAI_API_KEY", ""), help="OpenAI API key")
    p.add_argument("--yt_key", default=os.getenv("YOUTUBE_API_KEY", ""), help="Youtube API key")
    args = p.parse_args()

    df = run_pipeline(
        OPENAI_API_KEY=args.openai_key,
        YOUTUBE_API_KEY=args.yt_key,
        query=args.query,
        days_back=args.days,
        max_results=args.max,
        out_csv=args.out,
    )
    if args.mcp:
        start_mcp(df)


if __name__ == "__main__":
    cli()


Writing trendwatch_shorts_pipeline.py


In [2]:
!pip install --upgrade google-api-python-client youtube_transcript_api pytube moviepy fastmcp pillow openai pandas tqdm python-dotenv rich whisper pytubefix

Collecting youtube_transcript_api
  Downloading youtube_transcript_api-1.1.1-py3-none-any.whl.metadata (23 kB)
Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting moviepy
  Downloading moviepy-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastmcp
  Downloading fastmcp-2.10.5-py3-none-any.whl.metadata (17 kB)
Collecting pillow
  Downloading pillow-11.3.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (9.0 kB)
Collecting openai
  Downloading openai-1.97.0-py3-none-any.whl.metadata (29 kB)
Collecting pandas
  Downloading pandas-2.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting rich
  Downloading rich-14.0.0-py3-none-any.whl.metadata (18 kB)
Collecting whisper
  Downloadi

In [1]:
from google.colab import userdata
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
YOUTUBE_API_KEY = userdata.get('YOUTUBE_API_KEY')

import datetime
# date now
data_now = datetime.datetime.now().strftime("%Y-%m-%d")

tag = 'blonde'
# tag = 'shorts'
days_to_fetch = 3
out_csv = f"{tag}_{data_now}_{days_to_fetch}.csv"
!python trendwatch_shorts_pipeline.py --query $tag --days $days_to_fetch --out $out_csv --openai_key $OPENAI_API_KEY --yt_key $YOUTUBE_API_KEY

[1;36m🔍 Searching for shorts: [0m[1;36m'blonde'[0m[1;36m [0m[1;36m([0m[1;36mlast [0m[1;36m3[0m[1;36m days[0m[1;36m)[0m[1;36m…[0m
Found [1;36m50[0m potential shorts – fetching details…
Filtering by virality thresholds…
[32m✔ [0m[1;32m4[0m[32m shorts pass the filter[0m
[2KAnalyzing [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [35m100%[0m [36m0:00:00[0m
[?25h[1;32m✅ Saved results to blonde_2025-[0m[1;32m07[0m[1;32m-18_3.csv [0m[1;32m([0m[1;32m4[0m[1;32m rows[0m[1;32m)[0m
[1;36m🔍 Searching for shorts: [0m[1;36m'blonde'[0m[1;36m [0m[1;36m([0m[1;36mlast [0m[1;36m3[0m[1;36m days[0m[1;36m)[0m[1;36m…[0m
Found [1;36m50[0m potential shorts – fetching details…
Filtering by virality thresholds…
[32m✔ [0m[1;32m4[0m[32m shorts pass the filter[0m
[2KAnalyzing [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [35m  0%[0m [36m-:--:--[0m