### Youtube Official API

In [2]:
# Set up Client

In [4]:
!pip install google-api-python-client
from googleapiclient.discovery import build
import json
import csv
import os
from datetime import datetime

Collecting google-api-python-client
  Downloading google_api_python_client-2.187.0-py3-none-any.whl.metadata (7.0 kB)
Collecting httplib2<1.0.0,>=0.19.0 (from google-api-python-client)
  Downloading httplib2-0.31.0-py3-none-any.whl.metadata (2.2 kB)
Collecting google-auth!=2.24.0,!=2.25.0,<3.0.0,>=1.32.0 (from google-api-python-client)
  Downloading google_auth-2.43.0-py2.py3-none-any.whl.metadata (6.6 kB)
Collecting google-auth-httplib2<1.0.0,>=0.2.0 (from google-api-python-client)
  Downloading google_auth_httplib2-0.2.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0,>=1.31.5 (from google-api-python-client)
  Downloading google_api_core-2.28.1-py3-none-any.whl.metadata (3.3 kB)
Collecting uritemplate<5,>=3.0.1 (from google-api-python-client)
  Downloading uritemplate-4.2.0-py3-none-any.whl.metadata (2.6 kB)
Collecting proto-plus<2.0.0,>=1.22.3 (from google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0,>=1.31.5->google-api-python-c

In [5]:
API_KEY = "AIzaSyCloppVBo-oO1N7n--Cuje8SR6dS6ZjHpc"
youtube = build("youtube", "v3", developerKey=API_KEY)

CHANNEL_ID = "UC0C-w0YjGpqDXGB8IHb662A"  # Justin Bieber channel

OUTPUT_DIR = "outputs_youtube/"
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [6]:
# Get channel Stats

In [7]:
def get_channel_stats(channel_id):
    req = youtube.channels().list(
        part="snippet,statistics",
        id=channel_id
    )
    res = req.execute()
    return res["items"][0]


In [8]:
# Get Videos From Channel

In [9]:
def get_channel_videos(channel_id):
    video_ids = []
    req = youtube.search().list(
        part="id",
        channelId=channel_id,
        maxResults=50,
        order="date"
    )
    res = req.execute()
    
    for item in res["items"]:
        if item["id"]["kind"] == "youtube#video":
            video_ids.append(item["id"]["videoId"])
    return video_ids


In [10]:
# Get Video Statistics

In [16]:
def get_video_details(video_ids):
    req = youtube.videos().list(
        part="snippet,statistics,contentDetails",
        id=",".join(video_ids)
    )
    res = req.execute()
    return res["items"]


### Data validation

In [None]:
# Check missing keys

In [17]:
def validate_record(record, required_fields):
    missing = []
    for field in required_fields:
        if field not in record or record[field] is None:
            missing.append(field)
    return missing


In [18]:
# Validate datatypes

In [19]:
def validate_types(record, schema):
    for field, dtype in schema.items():
        if field in record and not isinstance(record[field], dtype):
            print(f"Type mismatch: {field} expected {dtype}, got {type(record[field])}")


In [20]:
### Save csv/json

In [21]:
def run_youtube_data_pull():

    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")

    # 1. Channel stats
    channel = get_channel_stats(CHANNEL_ID)
    with open(os.path.join(OUTPUT_DIR, f"youtube_channel_{timestamp}.json"), "w") as f:
        json.dump(channel, f, indent=2)

    # 2. Video IDs
    video_ids = get_channel_videos(CHANNEL_ID)

    # 3. Video details
    videos = get_video_details(video_ids)
    with open(os.path.join(OUTPUT_DIR, f"youtube_videos_{timestamp}.json"), "w") as f:
        json.dump(videos, f, indent=2)

    # 4. CSV export
    csv_path = os.path.join(OUTPUT_DIR, f"youtube_videos_{timestamp}.csv")
    with open(csv_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["video_id", "title", "views", "likes", "comments", "publishedAt", "duration"])

        for v in videos:
            stats = v["statistics"]
            writer.writerow([
                v["id"],
                v["snippet"]["title"],
                stats.get("viewCount", 0),
                stats.get("likeCount", 0),
                stats.get("commentCount", 0),
                v["snippet"]["publishedAt"],
                v["contentDetails"]["duration"]
            ])

    print("üéâ YouTube Data Pull Completed!")


if __name__ == "__main__":
    run_youtube_data_pull()

  timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")


üéâ YouTube Data Pull Completed!
