<a href="https://colab.research.google.com/github/skotarugit/EduTech/blob/main/Video_embeddings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install -r /content/requirements.txt

Collecting faiss-cpu>=1.7.4 (from -r /content/requirements.txt (line 7))
  Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Collecting yt-dlp>=2023.6.22 (from -r /content/requirements.txt (line 8))
  Downloading yt_dlp-2025.4.30-py3-none-any.whl.metadata (173 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.3/173.3 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.0.0->-r /content/requirements.txt (line 2))
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.0.0->-r /content/requirements.txt (line 2))
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.0.0->-r /content/requirements.txt (line 2))
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x

In [3]:
# ============================================
# STEP 1: INSTALL DEPENDENCIES
# ============================================

# Install required packages
!pip install -q transformers torch torchvision opencv-python-headless faiss-cpu yt-dlp findspark

# ============================================
# STEP 2: INITIALIZE SPARK
# ============================================

import os
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, lit, row_number
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import torch
import cv2
import tempfile
import subprocess
import json
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import faiss
import gc
import traceback
from pyspark.sql.window import Window

# Create Spark session
spark = SparkSession.builder \
    .appName("VideoEmbeddings") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# ============================================
# STEP 3: HELPER FUNCTIONS
# ============================================

def download_video( video_id: str):
    """Download a YouTube video and return the path to the downloaded file"""

    temp_dir = tempfile.mkdtemp()
    output_path = f"{temp_dir}/{video_id}.%(ext)s"
    url = f"https://www.youtube.com/watch?v={video_id}"

    cmd = [
            "yt-dlp",
            "-f", "best[height<=720]",  # Limit resolution to save space
            "--cookies", "/content/youtube_cookies.txt",
            "-o", output_path,
            "--quiet",
            "--no-warnings",
            url
        ]

    try:
        subprocess.run(cmd, check=True, capture_output=True, text=True)
            # Find the actual downloaded file
        for file in os.listdir(temp_dir):
            if file.startswith(video_id):
                return os.path.join(temp_dir, file)
        return None
    except subprocess.CalledProcessError as e:
        print(f"Error downloading video: {e}")
        return None

def extract_frames(video_path, frames_per_second=1):
    """Extract frames from video"""
    frames = []

    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return []

        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps <= 0:
            fps = 30  # Default fallback

        frame_interval = int(fps / frames_per_second)
        if frame_interval <= 0:
            frame_interval = 30

        frame_count = 0
        max_frames = 60  # Limit frames per video

        while cap.isOpened() and len(frames) < max_frames:
            ret, frame = cap.read()
            if not ret:
                break

            if frame_count % frame_interval == 0:
                # Resize for consistent processing
                frame = cv2.resize(frame, (224, 224))
                # Convert BGR to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                # Add to frames list
                frames.append(frame_rgb)

            frame_count += 1

        cap.release()

    except Exception as e:
        print(f"Frame extraction error: {e}")

    return frames

def generate_embeddings(frames, model, processor, device, batch_size=8):
    """Generate CLIP embeddings for frames"""
    if not frames:
        return []

    embeddings = []

    for i in range(0, len(frames), batch_size):
        batch_frames = frames[i:i + batch_size]
        pil_images = [Image.fromarray(frame) for frame in batch_frames]

        inputs = processor(images=pil_images, return_tensors="pt", padding=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            features = model.get_image_features(**inputs)
            features = features / features.norm(dim=-1, keepdim=True)

            for embedding in features.cpu().numpy():
                embeddings.append(embedding.tolist())

    return embeddings

# ============================================
# STEP 4: SPARK DATAFRAME PROCESSING
# ============================================

# Create schema for processed videos
processed_schema = StructType([
    StructField("video_id", StringType(), True),
    StructField("success", BooleanType(), True),
    StructField("error", StringType(), True),
    StructField("embeddings", ArrayType(ArrayType(FloatType())), True),
    StructField("frame_count", IntegerType(), True)
])


# Define processing function for a single video
def process_video(video_id):
    """Process a single video and return results as a tuple"""
    print(f"Processing {video_id}")

    try:
        # Initialize CLIP model
        device = torch.device("cpu")
        model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        model = model.to(device)
        model.eval()

        # Download video
        video_path = download_video(video_id)
        if not video_path:
            return (video_id, False, "Download failed", None, 0)

        # Extract frames
        frames = extract_frames(video_path)
        if not frames:
            if os.path.exists(video_path):
                os.remove(video_path)
            return (video_id, False, "No frames extracted", None, 0)

        # Generate embeddings
        embeddings = generate_embeddings(frames, model, processor, device)
        if not embeddings:
            if os.path.exists(video_path):
                os.remove(video_path)
            return (video_id, False, "No embeddings generated", None, 0)

        # Cleanup
        if os.path.exists(video_path):
            os.remove(video_path)
            try:
                os.rmdir(os.path.dirname(video_path))
            except:
                pass

        # Return success result
        return (video_id, True, None, embeddings, len(frames))

    except Exception as e:
        print(f"Error processing {video_id}: {e}")
        traceback.print_exc()
        return (video_id, False, str(e), None, 0)

# Function to process videos one by one
def process_videos_sequentially(video_ids):
    """Process videos one at a time"""
    results = []

    for video_id in video_ids:
        result = process_video(video_id)
        results.append(result)

    return results

# ============================================
# STEP 5: MAIN PROCESSING
# ============================================

# Create sample data
# video_ids = [
#     "dQw4w9WgXcQ",  # Rick Astley - Never Gonna Give You Up
#     "jNQXAC9IVRw",  # First YouTube video
#     "Hv2DV63EOtg",
#     "gITRsu87UG8"
# ]

# # Process videos sequentially (avoiding Spark's Row object issues)
# results = process_videos_sequentially(video_ids)

# # Create a pandas DataFrame from results
# pdf = pd.DataFrame(results, columns=["video_id", "success", "error", "embeddings", "frame_count"])

# # Convert to Spark DataFrame
# result_df = spark.createDataFrame(pdf)

# # Show results
# result_df.show(truncate=False)

# Count successes and failures
# success_counts = result_df.groupBy("success").count()
# success_counts.show()




# ============================================
# STEP 6: SEARCH FUNCTIONALITY
# ============================================

def search_by_text(query, k=5):
    """Search for similar frames using text query"""
    if not os.path.exists("/content/video_embeddings.index"):
        print("No embeddings index found!")
        return []

    # Load index and metadata
    index = faiss.read_index("/content/video_embeddings.index")
    with open("/content/video_metadata.json", "r") as f:
        metadata = json.load(f)

    # Generate query embedding
    device = torch.device("cpu")
    model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    model.eval()

    inputs = processor(text=query, return_tensors="pt", padding=True)

    with torch.no_grad():
        text_features = model.get_text_features(**inputs)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        query_embedding = text_features.cpu().numpy()[0]

    # Search
    distances, indices = index.search(query_embedding.reshape(1, -1), k)

    results = []
    for dist, idx in zip(distances[0], indices[0]):
        if idx < len(metadata):
            result = metadata[idx].copy()
            result['similarity'] = float(dist)
            results.append(result)

    return results


# ============================================
# STEP 7: PROCESS YOUR CSV DATA
# ============================================

# Function to process your CSV data
def process_df_from_csv(csv_path, max_videos=10):
    """Process videos from a CSV file"""
    # Load CSV file
    pdf = pd.read_csv(csv_path)

    # Extract video IDs (assuming 'video_path' column exists)
    if 'video_path' in pdf.columns:
        pdf['video_id'] = pdf['video_path'].apply(lambda x: x.split('.')[0])

    # Take just a subset for testing
    pdf = pdf[max_videos:max_videos+10]

    # Get list of video IDs
    video_ids = pdf['video_id'].tolist()

    # Process videos
    print(f"Processing {len(video_ids)} videos from CSV...")
    results = process_videos_sequentially(video_ids)

    # Create DataFrame with results
    result_df = pd.DataFrame(results, columns=["video_id", "success", "error", "embeddings", "frame_count"])

    # Process embeddings for storage
    embeddings_data = []
    metadata = []

    # Use pandas DataFrame to extract embeddings
    for index, row in result_df.iterrows():
        if row['success'] and isinstance(row['embeddings'], list):
            video_id = row['video_id']
            for i, embedding in enumerate(row['embeddings']):
                embeddings_data.append(embedding)
                metadata.append({
                    'video_id': video_id,
                    'frame_idx': i,
                    'embedding_idx': len(embeddings_data) - 1
                })

    # Save to FAISS index if we have embeddings
    if embeddings_data:
        # Create FAISS index
        embeddings_array = np.array(embeddings_data).astype('float32')
        index = faiss.IndexFlatIP(embeddings_array.shape[1])
        index.add(embeddings_array)

        # Save index and metadata
        faiss.write_index(index, "/content/video_embeddings.index")
        with open("/content/video_metadata.json", "w") as f:
            json.dump(metadata, f)

        print(f"Saved {len(embeddings_data)} embeddings to FAISS index")

    # Convert to Spark DataFrame for display
    spark_df = spark.createDataFrame(result_df)
    return spark_df, result_df


spark_df, pandas_df = process_df_from_csv('/content/all_videos.csv', max_videos=10)

# Show results in Spark
print("Spark DataFrame Results:")
spark_df.show(truncate=False)

# Count successes and failures
success_counts = spark_df.groupBy("success").count()
print("Success/Failure Counts:")
success_counts.show()

# Save results to CSV and parquet
pandas_df.to_csv("/content/video_processing_results.csv", index=False)
# spark_df.write.parquet("/content/video_processing_results.parquet")
print("Results saved to CSV")

Spark version: 3.4.0
Processing 10 videos from CSV...
Processing 9kZPRA3kn5Q
Processing bxBcqzDDKpA
Processing FycLd_5YLyY
Processing 3ll-KzJSYjE
Processing BxBhit8LWIQ
Processing FY_COK4qlYk
Processing fyCrA8XCkMI
Processing 2_MmPmFxGCo
Processing FjhjD4je1so
Processing Bi3a_WVXRXE
Saved 420 embeddings to FAISS index
Spark DataFrame Results:
+-----------+-------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
# Test search if index exists
if os.path.exists("/content/video_embeddings.index"):
    print("\nTesting search...")
    test_queries = ["dancing", "music video", "person talking"]

    for query in test_queries:
        print(f"\nSearch results for '{query}':")
        search_results = search_by_text(query)

        for i, result in enumerate(search_results):
            print(f"{i+1}. Video: {result['video_id']}, "
                  f"Frame: {result['frame_idx']}, "
                  f"Score: {result['similarity']:.4f}")


Testing search...

Search results for 'dancing':
1. Video: 2_MmPmFxGCo, Frame: 2, Score: 0.2515
2. Video: 9kZPRA3kn5Q, Frame: 11, Score: 0.2512
3. Video: 9kZPRA3kn5Q, Frame: 1, Score: 0.2497
4. Video: 2_MmPmFxGCo, Frame: 39, Score: 0.2481
5. Video: 9kZPRA3kn5Q, Frame: 19, Score: 0.2479

Search results for 'music video':
1. Video: 2_MmPmFxGCo, Frame: 41, Score: 0.2735
2. Video: 2_MmPmFxGCo, Frame: 2, Score: 0.2733
3. Video: FycLd_5YLyY, Frame: 20, Score: 0.2730
4. Video: 2_MmPmFxGCo, Frame: 1, Score: 0.2724
5. Video: FycLd_5YLyY, Frame: 19, Score: 0.2699

Search results for 'person talking':
1. Video: FY_COK4qlYk, Frame: 59, Score: 0.2557
2. Video: FycLd_5YLyY, Frame: 14, Score: 0.2538
3. Video: FY_COK4qlYk, Frame: 46, Score: 0.2530
4. Video: FY_COK4qlYk, Frame: 48, Score: 0.2527
5. Video: 3ll-KzJSYjE, Frame: 50, Score: 0.2492
