In [87]:
#@title YouTube Video Analysis Tool { run: "auto" }
import pandas as pd
import matplotlib.pyplot as plt
import requests
import re
import json
import random
import time
import asyncio
import aiohttp
from typing import List
from urllib.parse import urlparse, parse_qs
import nest_asyncio
import os
import nest_asyncio
nest_asyncio.apply()


# YouTube API Key (replace with your own or set via environment variable)
#API_KEY = "YOUR_API_KEY_HERE"  #@param {type:"string"}
API_KEY = "AIzaSyBZN7PacI6vuj9OCouinWbLVq_GviUbzgg"
YOUTUBE_API_URL = "https://www.googleapis.com/youtube/v3/videos"

from google.colab import drive
drive.mount('/content/drive')
#folder_path = "/content/drive/My Drive/YouTube API"

# YouTube Category Mapping
CATEGORY_MAP = {
    "1": "Film & Animation", "2": "Autos & Vehicles", "10": "Music", "15": "Pets & Animals",
    "17": "Sports", "18": "Short Movies", "19": "Travel & Events", "20": "Gaming",
    "21": "Videoblogging", "22": "People & Blogs", "23": "Comedy", "24": "Entertainment",
    "25": "News & Politics", "26": "Howto & Style", "27": "Education", "28": "Science & Technology",
    "29": "Nonprofits & Activism", "30": "Movies", "31": "Anime/Animation", "32": "Action/Adventure",
    "33": "Classics", "34": "Comedy", "35": "Documentary", "36": "Drama", "37": "Family",
    "38": "Foreign", "39": "Horror", "40": "Sci-Fi/Fantasy", "41": "Thriller", "42": "Shorts",
    "43": "Shows", "44": "Trailers",
}

# Colab Form Parameters
google_drive_data_folder = "/content/drive/My Drive/YouTube API/Data" #@param {type:"string"}
google_drive_results_folder = "/content/drive/My Drive/YouTube API/Results" #@param {type:"string"}
scrape = False  #@param {type:"boolean"}
keyword = "duck song"  #@param {type:"string"}
max_links = 20 #@param {type:"integer"}
fetch = True  #@param {type:"boolean"}
links_mode = "mixed"  #@param ["personalized", "non-personalized", "mixed"]
links_file = "Clean STAT 1001 Youtube Experiment.csv"  #@param {type:"string"}
output_csv = "mixed_output.csv"  #@param ["non_personalized_output.csv", "personalized_output.csv", "mixed_output.csv"]
analyze = False  #@param {type:"boolean"}
csv_file = "non_personalized_output.csv"  #@param {type:"string"}
output_png = "non_personalized_output.png"  #@param ["non_personalized_output.png", "personalized_output.png", "mixed_output.png"]
example = False  #@param {type:"boolean"}

def extract_video_id(link: str) -> str:
    """Extract video ID from a YouTube link, skipping playlists."""
    parsed = urlparse(link)
    if "list" in parse_qs(parsed.query):
        print(f"⚠️ Playlist link detected and skipped: {link}")
        return ""
    if parsed.hostname == "youtu.be":
        return parsed.path.lstrip("/")
    query = parse_qs(parsed.query)
    return query.get("v", [""])[0]

def batch_video_ids(video_links: List[str], batch_size: int = 50):
    """Batch video links into smaller groups for API requests."""
    for i in range(0, len(video_links), batch_size):
        yield video_links[i:i + batch_size]

async def fetch_video_metadata(session: aiohttp.ClientSession, video_links: List[str], api_key: str):
    """Fetch metadata for a batch of YouTube videos asynchronously."""
    video_ids = [extract_video_id(link) for link in video_links]
    if not video_ids:
        return []

    params = {"part": "snippet", "id": ",".join(video_ids), "key": api_key}
    try:
        async with session.get(YOUTUBE_API_URL, params=params) as resp:
            if resp.status != 200:
                print(f"API request failed with status {resp.status}")
                return []
            data = await resp.json()
            results = []
            for item in data.get("items", []):
                video_id = item.get("id")
                snippet = item.get("snippet", {})
                title = snippet.get("title")
                category_id = snippet.get("categoryId")
                category = CATEGORY_MAP.get(category_id, "Unknown")
                url = f"https://www.youtube.com/watch?v={video_id}"
                results.append({
                    "video_id": video_id, "title": title, "category_id": category_id,
                    "category": category, "url": url
                })
            return results
    except Exception as e:
        print(f"Error fetching metadata: {e}")
        return []

async def get_video_data_to_csv(video_links: List[str], output_csv: str = "video_data.csv"):
    """Fetch video data and save it to a CSV file."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_video_metadata(session, batch, API_KEY) for batch in batch_video_ids(video_links)]
        all_results = await asyncio.gather(*tasks)
        flat_results = [item for batch in all_results for item in batch]
        df = pd.DataFrame(flat_results)
        df.to_csv(output_csv, index=False)
        print(f"✅ Exported {len(df)} records to {output_csv}")
        return df

def scrape_youtube_links(keyword: str, max_links: int) -> List[str]:
    """Scrape YouTube video links based on a keyword."""
    search_url = f"https://www.youtube.com/results?search_query={keyword}"
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(search_url, headers=headers)
    match = re.search(r"var ytInitialData = ({.*?});</script>", response.text)
    if not match:
        print(f"No ytInitialData found for keyword: {keyword}")
        return []
    try:
        data = json.loads(match.group(1))
        video_ids = set()
        contents = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"]
        for section in contents:
            items = section.get("itemSectionRenderer", {}).get("contents", [])
            for item in items:
                video = item.get("videoRenderer")
                if video and "videoId" in video:
                    video_id = video["videoId"]
                    video_ids.add(f"https://www.youtube.com/watch?v={video_id}")
                    if len(video_ids) >= max_links:
                        return list(video_ids)
        return list(video_ids)
    except Exception as e:
        print(f"Error parsing data for keyword {keyword}: {e}")
        return []

def get_random_youtube_links(total_needed: int = 600) -> List[str]:
    """Generate a specified number of random YouTube video links."""
    all_links = set()
    attempts = 0
    while len(all_links) < total_needed and attempts < 100:
        keyword = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=3))
        print(f"[{len(all_links)}/{total_needed}] Searching: {keyword}")
        new_links = scrape_youtube_links(keyword, max_links=50)
        all_links.update(new_links)
        attempts += 1
        time.sleep(0.5)  # Polite delay to avoid overwhelming the server
    return list(all_links)[:total_needed]

def find_top_category(csv_file: str):
    """Analyze and visualize category distribution from a CSV file."""
    df = pd.read_csv(csv_file)
    category_counts = df["category"].value_counts().sort_values(ascending=False)
    print(f"\nCategory distribution for {csv_file}:\n{category_counts}")

    plt.figure(figsize=(10, 6))
    category_counts.plot(kind="bar", color="skyblue", edgecolor="black")
    plt.title(f"Video Counts by Category ({csv_file})", fontsize=16)
    plt.xlabel("Category", fontsize=12)
    plt.ylabel("Count", fontsize=12)
    plt.xticks(rotation=45, ha="right")
    for i, count in enumerate(category_counts):
        plt.text(i, count + 0.2, str(count), ha="center", va="bottom")
    plt.tight_layout()
    plt.grid(axis="y", linestyle="--", alpha=0.7)

    output = os.path.join(google_drive_results_folder, "category_dist_"+output_png)
    plt.savefig(output)
    print(f"📊 Plot saved to: {output}")

    plt.show()
    plt.close()


async def run_example_analysis():
    """Run a full example analysis with toy links and random links."""
    print("\nRunning example analysis...")

    # 1. Toy links
    toy_links = [
        "https://www.youtube.com/watch?v=MtN1YnoL46Q",
        "https://www.youtube.com/watch?v=N2HQaQJsyQ4",
        "https://www.youtube.com/watch?v=IJNR2EpS0jw",
         # debug personal->Duck Song2->Comedy
        "https://www.youtube.com/watch?v=7jjcAuEYW9M",
         # original duck song
         "https://www.youtube.com/watch?v=MtN1YnoL46Q",
         # additional testing on Duck Song 1-7
         "https://www.youtube.com/watch?v=7wWJJtIbiHk",
         # bad playlist -> warning + delete
         "https://youtube.com/playlist?list=RDEMVdIAVFQ0_mzQJ5fQjBa7HQ&playnext=1&si=8oPmUNKvvGjWh29m",
         # invalid link -> delete
         "https://www.youtube.com/watch?v=notexisthahaha"
         # youtube category checker to confirm correctness
         # https://tubepilot.ai/tools/youtube-category-checker/
    ]

    print("Processing toy links...")
    await get_video_data_to_csv(toy_links, os.path.join(google_drive_results_folder,"toy_output.csv"))

    # 2. Random YouTube links
    print("Generating random YouTube links...")
    random_links = get_random_youtube_links(100)
    await get_video_data_to_csv(random_links, os.path.join(google_drive_results_folder,"random_output.csv"))

async def main():
    """Main function to handle form inputs and run selected actions."""
    if scrape:
        if not keyword:
            print("Error: Scraping requires a keyword")
            return
        print(f"Scraping up to {max_links} links for keyword: {keyword}")
        links = scrape_youtube_links(keyword, max_links)
        print(f"Scraped {len(links)} links: {links[:5]}... (showing first 5)")
    elif fetch:
        full_links_file = os.path.join(google_drive_data_folder, links_file)
        full_output_csv = os.path.join(google_drive_results_folder, output_csv)
        if not links_file:
            print("Error: Fetching metadata requires a links file")
            return
        if not os.path.exists(full_links_file):
            print(f"Error: File {full_links_file} not found")
            return
        df = pd.read_csv(full_links_file)

        # Directly handle the links based on the mode
        if links_mode == "personalized":
            video_links = df.iloc[:, 1:11].values.flatten()
        elif links_mode == "non-personalized":
            video_links = df.iloc[:, 11:21].values.flatten()
        else:  # 'mixed'
            video_links = list(df.iloc[:, 1:11].values.flatten()) + list(df.iloc[:, 11:21].values.flatten())

        asyncio.run(get_video_data_to_csv(video_links, full_output_csv))
        #await get_video_data_to_csv(video_links, output_csv)
    elif analyze:
        full_csv_file = os.path.join(google_drive_data_folder, csv_file)
        if not csv_file:
            print("Error: Analyzing requires a CSV file")
            return
        if not os.path.exists(full_csv_file):
            print(f"Error: File {full_csv_file} not found")
            return
        find_top_category(full_csv_file)
    elif example:
        asyncio.run(run_example_analysis())
    else:
        print("No action selected. Please set one of the options.")

# Run the main function
await main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=IhgxZ8tmBxU&list=RDIhgxZ8tmBxU&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=KDorKy-13ak&list=RDKDorKy-13ak&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=V1bFr2SWP1I&list=RDV1bFr2SWP1I&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=TvKJ7uvQPfg&list=RDTvKJ7uvQPfg&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=7jjcAuEYW9M&list=RD7jjcAuEYW9M&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=Ru4a-js4My4&list=RDRu4a-js4My4&start_radio=1&pp=oAcB
⚠️ Playlist link detected and skipped: https://www.youtube.com/watch?v=ZlDeUVvtsZ4&list=RDZlDeUVvtsZ4&start_radio=1&pp=oAcB
⚠️ 