In [1]:
# for data collection and extraction
from googleapiclient.discovery import build
import pandas as pd
import time
import os
from dotenv import load_dotenv

load_dotenv()
api_key = os.getenv("YOUTUBE_API_KEY")

def get_youtube_client():
    return build("youtube", "v3", developerKey=api_key)

youtube = get_youtube_client()

In [2]:
# fetch api multiple yt vid stats simultaneously
def get_vid_stats_batch(video_ids):
    req = youtube.videos().list(
        part="statistics",
        id=",".join(video_ids)
    )
    response = req.execute()
    data = []
    for item in response["items"]:
        stats = item["statistics"]
        data.append({
            data.append({
                "video_id": item["id"],
                "views": int(stats.get("viewCount", 0)),
                "likes": int(stats.get("likeCount", 0)),
                "comments": int(stats.get("commentCount", 0))
            })
        })
    return data

In [None]:
# keyword search yt vid
def search_videos(keyword, start_year, max_results=100):
    video_data = []
    next_page_token = None
    collected = 0
    published_after = f"{start_year}-01-01T00:00:00Z"

    while collected < max_results:
        request = youtube.search().list(
            part="id",
            q=keyword,
            type="video",
            maxResults=min(50, max_results - collected),
            pageToken=next_page_token,
            publishedAfter=published_after
        )
        response = request.execute()

        video_ids = [item["id"]["videoId"] for item in response["items"]] 
        batch_data = get_vid_stats_batch(video_ids)
        video_data.extend(batch_data)

        collected += len(video_ids)
        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break

    return pd.DataFrame(video_data)

In [4]:
# data stats collection + engagement rate
def collect_keyword(keyword, start_year, max_results=100):
    print(f"Collecting numeric stats for: {keyword}'...")
    df = search_videos(keyword, start_year=start_year, max_results=max_results)
    df["keyword"] = keyword
    # feature engineering for quick inspection
    df["engagement_rate"] = (df["likes"] + df["comment"]) / df["views"].replace(0, 1) 
    return df

In [None]:
# Example execution
if __name__ == "__main__":
    # User input
    keyword = input("Enter keyword: ").strip()
    start_year = int(input("Enter start year (YYYY): "))
    # for real-time data
    df = collect_keyword(keyword, start_year=start_year, max_results=100)
    print(df.head())