In [None]:
!pip install pytube webvtt-py opencv-python pymongo qdrant-client transformers torch sentencepiece ffmpeg-python pysrt gradio
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir('/content/drive/MyDrive/ai final project')  # working directory

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Import required libraries
import os
import time
import json
import subprocess
import cv2
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, HTML
import re
import hashlib
import pysrt

# Create necessary directories if they don't exist
os.makedirs('data/raw', exist_ok=True)
os.makedirs('data/processed', exist_ok=True)
os.makedirs('data/demo', exist_ok=True)

# Print confirmation
print("Project directories created successfully!")
print(f"Current directory: {os.getcwd()}")
print(f"Directory contents: {os.listdir()}")

!pip install -q yt-dlp

selected_videos = [
    # Deep Neural Networks
    "https://www.youtube.com/watch?v=M-bIqxvF984",  # Introduction
    "https://www.youtube.com/watch?v=JW22NeQXk64",  # Backpropagation

    # Probabilistic Reasoning
    "https://www.youtube.com/watch?v=qegibGSstNE",  # Introduction
    "https://www.youtube.com/watch?v=eiGC3e78JVw",  # HMMs

    # Reinforcement Learning
    "https://www.youtube.com/watch?v=kVYyDO0B6xo",  # Introduction
    "https://www.youtube.com/watch?v=L90h-1Sntnw",  # Markov Decision Processes
]

# Helper function to clean subtitles
def clean_subtitle_text(text):
    """Clean subtitle text by removing HTML tags and extra whitespace"""
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Function to download a YouTube video using yt-dlp
def download_youtube_video(video_url, output_path="data/raw"):
    """
    Download a YouTube video and its subtitles using yt-dlp

    Args:
        video_url: URL of the YouTube video
        output_path: Path to save the video

    Returns:
        Tuple of (video_path, subtitle_path) if successful, None otherwise
    """
    try:
        # Extract video ID from URL
        video_id = video_url.split("v=")[1].split("&")[0]

        # Create output filename
        output_template = os.path.join(output_path, f"{video_id}_%(title)s")

        # Download video with subtitles
        cmd = [
            "yt-dlp",
            "--write-auto-sub",  # Download auto-generated subtitles if available
            "--sub-lang", "en",   # English subtitles
            "--convert-subs", "srt", # Convert subtitles to SRT format
            "-f", "best[height<=720]", # 720p or lower quality to save space
            "-o", f"{output_template}.%(ext)s",
            video_url
        ]

        print(f"Downloading: {video_url}")
        subprocess.run(cmd, check=True)

        # Find the downloaded video file
        video_files = [f for f in os.listdir(output_path) if f.startswith(video_id) and not f.endswith(".srt")]
        if not video_files:
            print(f"No video file found for {video_url}")
            return None, None

        video_path = os.path.join(output_path, video_files[0])

        # Find the subtitle file
        subtitle_files = [f for f in os.listdir(output_path) if f.startswith(video_id) and f.endswith(".srt")]
        subtitle_path = None
        if subtitle_files:
            subtitle_path = os.path.join(output_path, subtitle_files[0])
            print(f"Subtitles downloaded to {subtitle_path}")
        else:
            print(f"No subtitles found for {video_url}")

        print(f"Video downloaded to {video_path}")
        return video_path, subtitle_path

    except Exception as e:
        print(f"Error downloading video: {str(e)}")
        return None, None

# Download all selected videos
video_subtitle_pairs = []
for video_url in selected_videos:
    video_path, subtitle_path = download_youtube_video(video_url)
    if video_path and subtitle_path:
        video_subtitle_pairs.append((video_path, subtitle_path))
        # Add a small delay to avoid hitting rate limits
        time.sleep(2)
    else:
        print(f"Skipping video: {video_url}")

print(f"Downloaded {len(video_subtitle_pairs)} videos successfully")

Project directories created successfully!
Current directory: /content/drive/MyDrive/ai final project
Directory contents: ['Notebooks', 'Scripts', 'data']
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.2/172.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m25.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading: https://www.youtube.com/watch?v=M-bIqxvF984
Subtitles downloaded to data/raw/M-bIqxvF984_1 neural networks.en.srt
Video downloaded to data/raw/M-bIqxvF984_1 neural networks.mp4
Downloading: https://www.youtube.com/watch?v=JW22NeQXk64
Subtitles downloaded to data/raw/JW22NeQXk64_2 multiclass classifier fashion mnist.en.srt
Video downloaded to data/raw/JW22NeQXk64_2 multiclass classifier fashion mnist.mp4
Downloading: https://www.youtube.com/watch?v=qegibGSstNE
Subtitles downloaded to data/raw/qegibGSstNE_1 probabilistic reasoning.en.srt
Video downloaded to data/raw/qegibGSstNE_1 prob

In [None]:
# Function to extract frames at subtitle timestamps
def extract_frames_at_subtitles(video_path, subtitle_path, output_folder="data/processed"):
    """
    Extract frames from video at subtitle timestamps

    Args:
        video_path: Path to the video file
        subtitle_path: Path to the subtitle file
        output_folder: Path to save extracted frames

    Returns:
        List of dictionaries containing frame information
    """
    if not os.path.exists(video_path) or not os.path.exists(subtitle_path):
        print("Video or subtitle file not found")
        return []

    # Extract video ID from filename
    video_id = os.path.basename(video_path).split('_')[0]

    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return []

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = frame_count / fps

    print(f"Video FPS: {fps}, Frame count: {frame_count}, Duration: {duration:.2f}s")

    # Parse subtitles
    subtitles = []
    if subtitle_path.endswith('.srt'):
        import pysrt
        subs = pysrt.open(subtitle_path)
        for sub in subs:
            start_time_ms = (sub.start.hours * 3600 + sub.start.minutes * 60 +
                            sub.start.seconds) * 1000 + sub.start.milliseconds
            end_time_ms = (sub.end.hours * 3600 + sub.end.minutes * 60 +
                          sub.end.seconds) * 1000 + sub.end.milliseconds
            text = clean_subtitle_text(sub.text)
            subtitles.append({
                'start_time_ms': start_time_ms,
                'end_time_ms': end_time_ms,
                'text': text
            })
    elif subtitle_path.endswith('.vtt'):
        for sub in webvtt.read(subtitle_path):
            # Parse start and end times
            start_parts = sub.start.split(':')
            end_parts = sub.end.split(':')

            # Calculate milliseconds
            start_time_ms = (int(start_parts[0]) * 3600 + int(start_parts[1]) * 60 +
                            float(start_parts[2])) * 1000
            end_time_ms = (int(end_parts[0]) * 3600 + int(end_parts[1]) * 60 +
                          float(end_parts[2])) * 1000

            text = clean_subtitle_text(sub.text)
            subtitles.append({
                'start_time_ms': start_time_ms,
                'end_time_ms': end_time_ms,
                'text': text
            })

    print(f"Found {len(subtitles)} subtitle entries")

    # Create output folder if it doesn't exist
    frames_folder = os.path.join(output_folder, video_id)
    os.makedirs(frames_folder, exist_ok=True)

    # Extract frames at subtitle timestamps
    frame_data = []
    prev_frame_hash = None

    for i, subtitle in enumerate(subtitles):
        # Set video position to subtitle start time
        timestamp_ms = subtitle['start_time_ms']
        cap.set(cv2.CAP_PROP_POS_MSEC, timestamp_ms)

        # Read frame
        ret, frame = cap.read()
        if not ret:
            print(f"Could not read frame at {timestamp_ms}ms")
            continue

        # Calculate frame hash to detect duplicates
        frame_hash = hashlib.md5(frame.tobytes()).hexdigest()

        # Skip if frame is duplicate of previous
        if frame_hash == prev_frame_hash:
            print(f"Skipping duplicate frame at {timestamp_ms}ms")
            continue

        # Save frame
        frame_path = os.path.join(frames_folder, f"frame_{i:04d}_{timestamp_ms}.jpg")
        cv2.imwrite(frame_path, frame)

        # Store frame data
        frame_info = {
            'video_id': video_id,
            'frame_index': i,
            'timestamp_ms': timestamp_ms,
            'subtitle_text': subtitle['text'],
            'frame_path': frame_path,
            'frame_hash': frame_hash
        }

        frame_data.append(frame_info)
        prev_frame_hash = frame_hash

        # Print progress
        if i % 10 == 0:
            print(f"Processed {i}/{len(subtitles)} frames")

    # Release video
    cap.release()
    print(f"Extracted {len(frame_data)} frames from {len(subtitles)} subtitles")

    return frame_data

# Function to store frame data in MongoDB (if available)
def store_frames_in_mongodb(frame_data, mongo_uri="mongodb://localhost:27017/"):
    """
    Store frame data in MongoDB

    Args:
        frame_data: List of dictionaries containing frame information
        mongo_uri: MongoDB connection URI

    Returns:
        Number of frames stored
    """
    try:
        # Connect to MongoDB
        client = MongoClient(mongo_uri)
        db = client['video_rag_db']
        collection = db['video_frames']

        # Insert frame data
        result = collection.insert_many(frame_data)
        print(f"Stored {len(result.inserted_ids)} frames in MongoDB")

        return len(result.inserted_ids)
    except Exception as e:
        print(f"Error storing frames in MongoDB: {str(e)}")
        print("Continuing without storing in MongoDB...")
        return 0

# Function to display sample frames
def display_sample_frames(frame_data, num_samples=3):
    """Display sample frames with their subtitles"""
    if not frame_data:
        print("No frames to display")
        return

    num_samples = min(num_samples, len(frame_data))
    indices = np.linspace(0, len(frame_data) - 1, num_samples, dtype=int)

    plt.figure(figsize=(15, 5 * num_samples))

    for i, idx in enumerate(indices):
        frame_info = frame_data[idx]
        frame = cv2.imread(frame_info['frame_path'])
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        plt.subplot(num_samples, 1, i + 1)
        plt.imshow(frame)
        plt.title(f"Frame {frame_info['frame_index']} - Time: {frame_info['timestamp_ms']/1000:.2f}s")
        plt.figtext(0.5, 0.5 - (i * 0.2), f"Subtitle: {frame_info['subtitle_text']}",
                   ha="center", fontsize=12, bbox={"facecolor":"white", "alpha":0.5, "pad":5})

    plt.tight_layout()
    plt.show()

# Process all downloaded videos
all_frame_data = {}

# Process each video-subtitle pair
for video_path, subtitle_path in video_subtitle_pairs:
    print(f"\nProcessing {os.path.basename(video_path)}...")

    # Extract frames
    frame_data = extract_frames_at_subtitles(video_path, subtitle_path)

    if frame_data:
        # Store the video ID with its frames
        video_id = os.path.basename(video_path).split('_')[0]
        all_frame_data[video_id] = frame_data

        # Optionally store in MongoDB
        try_mongodb = False  # Set to True if you have MongoDB set up
        if try_mongodb:
            store_frames_in_mongodb(frame_data)

        # Save frame data to JSON
        json_path = f"data/processed/{video_id}_frames.json"
        with open(json_path, 'w') as f:
            json.dump(frame_data, f, indent=2)

        print(f"Frame data saved to {json_path}")

        # Display sample frames
        display_sample_frames(frame_data, num_samples=2)

print("\nData collection complete!")
print(f"Processed {len(all_frame_data)} videos")
print(f"Total frames extracted: {sum(len(frames) for frames in all_frame_data.values())}")

Output hidden; open in https://colab.research.google.com to view.