# YouTube Pipeline Implementation

In [None]:
from __future__ import annotations

import os
import re
import math
import time
import json
import hashlib
from dataclasses import dataclass
from typing import Optional, Dict, Any, List, Tuple

import pandas as pd
import requests


# -----------------------------
# Helpers: ID parsing + batching
# -----------------------------

_YT_ID_RE = re.compile(r"(?:v=|\/shorts\/|youtu\.be\/|\/embed\/)([A-Za-z0-9_-]{11})")

def extract_video_id(url: str) -> Optional[str]:
    """Extract 11-char YouTube video_id from common URL formats."""
    if not isinstance(url, str) or not url.strip():
        return None
    m = _YT_ID_RE.search(url)
    return m.group(1) if m else None

def chunked(lst: List[str], n: int) -> List[List[str]]:
    return [lst[i:i+n] for i in range(0, len(lst), n)]


# -----------------------------
# YouTube Data API client (requests-based)
# -----------------------------

@dataclass
class YouTubeAPI:
    api_key: str
    base_url: str = "https://www.googleapis.com/youtube/v3"
    session: Optional[requests.Session] = None
    sleep_s: float = 0.1  # tiny throttle to be polite / avoid rate spikes

    def _sess(self) -> requests.Session:
        if self.session is None:
            self.session = requests.Session()
        return self.session

    def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
        params = dict(params)
        params["key"] = self.api_key
        url = f"{self.base_url}/{path}"
        r = self._sess().get(url, params=params, timeout=30)
        if r.status_code != 200:
            raise RuntimeError(f"YT API error {r.status_code}: {r.text[:500]}")
        time.sleep(self.sleep_s)
        return r.json()

    def videos_list(self, video_ids: List[str], parts: str) -> List[Dict[str, Any]]:
        """Fetch video resources for up to 50 ids."""
        out = []
        for batch in chunked(video_ids, 50):
            data = self._get("videos", {"part": parts, "id": ",".join(batch), "maxResults": 50})
            out.extend(data.get("items", []))
        return out

    def channels_list(self, channel_ids: List[str], parts: str) -> List[Dict[str, Any]]:
        """Fetch channel resources for up to 50 ids."""
        out = []
        for batch in chunked(channel_ids, 50):
            data = self._get("channels", {"part": parts, "id": ",".join(batch), "maxResults": 50})
            out.extend(data.get("items", []))
        return out


# -----------------------------
# Light feature extraction
# -----------------------------

def iso8601_duration_to_seconds(dur: Optional[str]) -> Optional[int]:
    """
    Convert ISO 8601 duration like PT1H2M3S to seconds.
    """
    if not dur or not isinstance(dur, str):
        return None
    # Simple parser: PT#H#M#S
    h = m = s = 0
    mobj = re.match(r"^PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$", dur)
    if not mobj:
        return None
    if mobj.group(1): h = int(mobj.group(1))
    if mobj.group(2): m = int(mobj.group(2))
    if mobj.group(3): s = int(mobj.group(3))
    return h * 3600 + m * 60 + s


def pick_best_thumbnail(thumbnails: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
    """
    Choose best available thumbnail URL from snippet.thumbnails.
    Returns (url, label).
    """
    if not isinstance(thumbnails, dict):
        return (None, None)
    order = ["maxres", "standard", "high", "medium", "default"]
    for k in order:
        if k in thumbnails and isinstance(thumbnails[k], dict) and "url" in thumbnails[k]:
            return thumbnails[k]["url"], k
    return (None, None)


# Optional image features (requires pillow + numpy)
def compute_image_features_from_url(img_url: str, cache_dir: str) -> Dict[str, Any]:
    """
    Download image to cache_dir and compute basic features:
    - width, height
    - mean_brightness (0-255)
    - colorfulness (Hasler & Süsstrunk-ish approximation)
    - file_hash (sha1)
    """
    feats: Dict[str, Any] = {
        "thumb_path": None,
        "thumb_sha1": None,
        "thumb_width": None,
        "thumb_height": None,
        "thumb_mean_brightness": None,
        "thumb_colorfulness": None,
    }
    if not img_url:
        return feats

    os.makedirs(cache_dir, exist_ok=True)
    # deterministic filename from URL
    fname = hashlib.sha1(img_url.encode("utf-8")).hexdigest() + ".jpg"
    fpath = os.path.join(cache_dir, fname)

    # download if missing
    if not os.path.exists(fpath):
        r = requests.get(img_url, timeout=30)
        if r.status_code != 200:
            return feats
        with open(fpath, "wb") as f:
            f.write(r.content)

    feats["thumb_path"] = fpath

    # compute features
    try:
        from PIL import Image
        import numpy as np

        with open(fpath, "rb") as f:
            b = f.read()
        feats["thumb_sha1"] = hashlib.sha1(b).hexdigest()

        img = Image.open(fpath).convert("RGB")
        w, h = img.size
        feats["thumb_width"] = w
        feats["thumb_height"] = h

        arr = np.asarray(img).astype(np.float32)  # (H,W,3)
        # brightness (simple luminance proxy)
        brightness = 0.2126 * arr[..., 0] + 0.7152 * arr[..., 1] + 0.0722 * arr[..., 2]
        feats["thumb_mean_brightness"] = float(np.mean(brightness))

        # colorfulness approximation (Hasler & Süsstrunk style)
        rg = arr[..., 0] - arr[..., 1]
        yb = 0.5 * (arr[..., 0] + arr[..., 1]) - arr[..., 2]
        std_rg = float(np.std(rg))
        std_yb = float(np.std(yb))
        mean_rg = float(np.mean(rg))
        mean_yb = float(np.mean(yb))
        feats["thumb_colorfulness"] = float(math.sqrt(std_rg**2 + std_yb**2) + 0.3 * math.sqrt(mean_rg**2 + mean_yb**2))

    except Exception:
        # If PIL/numpy not installed or image parse fails, just skip features.
        pass

    return feats


# -----------------------------
# Main function: enrich CSV -> add columns
# -----------------------------

def enrich_youtube_csv(
    csv_path: str,
    api_key: Optional[str] = None,
    video_id_col: str = "video_id",
    url_col: str = "url",
    out_csv_path: Optional[str] = None,
    add_thumbnail_features: bool = False,
    thumbnail_cache_dir: str = "thumb_cache",
) -> pd.DataFrame:
    """
    Enrich a CSV containing YouTube video IDs and/or URLs.

    Adds columns:
      Video:
        - yt_title, yt_description, yt_published_at, yt_duration_sec
        - yt_category_id, yt_tags_json, yt_default_language, yt_default_audio_language
        - yt_made_for_kids, yt_live_broadcast_content
        - yt_view_count, yt_like_count, yt_comment_count
        - yt_channel_id
        - yt_thumb_url, yt_thumb_quality
      Channel:
        - yt_channel_title, yt_channel_published_at, yt_channel_country
        - yt_subscriber_count, yt_channel_view_count, yt_channel_video_count
    Optionally adds thumbnail image features if add_thumbnail_features=True.

    Requirements:
      - requests, pandas
      - optional: pillow, numpy for thumbnail features

    Notes:
      - YouTube API quota applies.
      - Missing/removed videos will simply not return items; those rows remain NaN.
    """
    api_key = api_key or os.getenv("YOUTUBE_API_KEY")
    if not api_key:
        raise ValueError("Missing YouTube API key. Pass api_key=... or set YOUTUBE_API_KEY env var.")

    df = pd.read_csv(csv_path)

    # Ensure video_id exists: use explicit column or parse from URL
    if video_id_col not in df.columns:
        df[video_id_col] = None

    if url_col in df.columns:
        missing_vid = df[video_id_col].isna() | (df[video_id_col].astype(str).str.strip() == "")
        df.loc[missing_vid, video_id_col] = df.loc[missing_vid, url_col].apply(extract_video_id)

    # Clean ids
    df[video_id_col] = df[video_id_col].astype(str).str.strip()
    df.loc[df[video_id_col].isin(["", "nan", "None"]), video_id_col] = pd.NA

    video_ids = df[video_id_col].dropna().unique().tolist()
    if not video_ids:
        raise ValueError("No valid video IDs found in the CSV (either in video_id_col or parsed from url_col).")

    yt = YouTubeAPI(api_key=api_key)

    # --- Fetch video data ---
    video_parts = "snippet,contentDetails,statistics,status"
    video_items = yt.videos_list(video_ids, parts=video_parts)

    video_rows: Dict[str, Dict[str, Any]] = {}
    channel_ids: List[str] = []

    for item in video_items:
        vid = item.get("id")
        snippet = item.get("snippet", {}) or {}
        stats = item.get("statistics", {}) or {}
        content = item.get("contentDetails", {}) or {}
        status = item.get("status", {}) or {}

        channel_id = snippet.get("channelId")
        if channel_id:
            channel_ids.append(channel_id)

        thumb_url, thumb_quality = pick_best_thumbnail(snippet.get("thumbnails", {}))

        video_rows[vid] = {
            "yt_title": snippet.get("title"),
            "yt_description": snippet.get("description"),
            "yt_published_at": snippet.get("publishedAt"),
            "yt_duration_sec": iso8601_duration_to_seconds(content.get("duration")),
            "yt_category_id": snippet.get("categoryId"),
            "yt_tags_json": json.dumps(snippet.get("tags")) if snippet.get("tags") is not None else None,
            "yt_default_language": snippet.get("defaultLanguage"),
            "yt_default_audio_language": snippet.get("defaultAudioLanguage"),
            "yt_made_for_kids": status.get("madeForKids"),
            "yt_live_broadcast_content": snippet.get("liveBroadcastContent"),
            "yt_view_count": safe_int(stats.get("viewCount")),
            "yt_like_count": safe_int(stats.get("likeCount")),
            "yt_comment_count": safe_int(stats.get("commentCount")),
            "yt_channel_id": channel_id,
            "yt_thumb_url": thumb_url,
            "yt_thumb_quality": thumb_quality,
        }

    # Map video enrichment back to df
    video_enriched = pd.DataFrame.from_dict(video_rows, orient="index")
    video_enriched.index.name = video_id_col
    df = df.merge(video_enriched, how="left", left_on=video_id_col, right_index=True)

    # --- Fetch channel data ---
    channel_ids = sorted(set([c for c in df["yt_channel_id"].dropna().unique().tolist() if isinstance(c, str)]))
    if channel_ids:
        channel_parts = "snippet,statistics"
        channel_items = yt.channels_list(channel_ids, parts=channel_parts)

        channel_rows: Dict[str, Dict[str, Any]] = {}
        for item in channel_items:
            cid = item.get("id")
            snippet = item.get("snippet", {}) or {}
            stats = item.get("statistics", {}) or {}
            channel_rows[cid] = {
                "yt_channel_title": snippet.get("title"),
                "yt_channel_published_at": snippet.get("publishedAt"),
                "yt_channel_country": snippet.get("country"),
                "yt_subscriber_count": safe_int(stats.get("subscriberCount")),
                "yt_channel_view_count": safe_int(stats.get("viewCount")),
                "yt_channel_video_count": safe_int(stats.get("videoCount")),
            }

        channel_enriched = pd.DataFrame.from_dict(channel_rows, orient="index")
        channel_enriched.index.name = "yt_channel_id"
        df = df.merge(channel_enriched, how="left", on="yt_channel_id")

    # --- Optional: thumbnail image features ---
    if add_thumbnail_features:
        feats_list = []
        for _, row in df.iterrows():
            feats = compute_image_features_from_url(row.get("yt_thumb_url"), thumbnail_cache_dir)
            feats_list.append(feats)
        feats_df = pd.DataFrame(feats_list)
        df = pd.concat([df.reset_index(drop=True), feats_df.reset_index(drop=True)], axis=1)

    # Save
    if out_csv_path:
        df.to_csv(out_csv_path, index=False)

    return df


def safe_int(x: Any) -> Optional[int]:
    try:
        if x is None:
            return None
        return int(x)
    except Exception:
        return None

In [None]:
df_enriched = enrich_youtube_csv(
    csv_path='/Users/maxchalekson/Northwestern University/Winter-2026/MSDS-422-0/Final-Project/422-final-project/youtube_data.csv',
    api_key=os.getenv("YOUTUBE_API_KEY"),   # or paste your key here (not recommended)
    video_id_col="video_id",
    url_col="url",
    out_csv_path="/mnt/data/youtube_data_enriched.csv",
    add_thumbnail_features=True,            # set False if you don’t want downloads/features
    thumbnail_cache_dir="/mnt/data/thumb_cache",
)

print(df_enriched.shape)
print(df_enriched.columns.tolist())