VIDEO EXTRACT AI

In [6]:
# CELL 1: SETUP, DEPENDENCIES, AND GOOGLE DRIVE CONFIGURATION
import os
import sys
import yaml
from pathlib import Path
from pydantic import BaseModel, Field
from typing import List

# --- 1. Mount Google Drive ---
# This is the first and most important step for the new workflow.
# You will be prompted to authorize Colab to access your Drive.
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("✅ Google Drive mounted successfully.")
except Exception as e:
    print(f"❌ Failed to mount Google Drive: {e}", file=sys.stderr)
    sys.exit(1)

# --- 2. Install Dependencies ---
print("\n--- Installing Python packages ---")
!pip install -q pydantic yt-dlp ffmpeg-python pandas tqdm pyyaml

print("\n--- Installing system FFmpeg (with NVENC) ---")
!apt-get update -qq && apt-get install -y -qq ffmpeg

print("\n--- Verifying GPU and NVENC ---")
!nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader
!ffmpeg -hide_banner -hwaccels | grep -q 'cuda' && echo "✅ CUDA HWAccel found." || echo "⚠️ CUDA HWAccel not found."
!ffmpeg -hide_banner -encoders | grep -q 'nvenc' && echo "✅ NVENC encoders found." || echo "⚠️ NVENC encoders not found."

# --- 3. Define NEW Configuration Schema for Google Drive ---
# This schema is tailored for the Kinetics-to-Drive workflow.
class DriveConfig(BaseModel):
    # Example: "My Drive/Kinetics-700-Clips"
    # This path will be created in your Google Drive root.
    output_directory: str

class EncodingParams(BaseModel):
    cq: int = 23
    preset: str = 'p6'

class VideoProcessingConfig(BaseModel):
    vcodec: str = "h264_nvenc"
    target_fps: int = 30
    encoding_params: EncodingParams = Field(default_factory=EncodingParams)
    run_output_dir: Path = Field(default_factory=lambda: Path.cwd() / "temp_clips")

class ExecutionConfig(BaseModel):
    max_workers: int = 6 # Tuned for Colab's I/O and GPU balance
    sleep_interval_seconds: int = 2

class MainConfig(BaseModel):
    drive: DriveConfig
    video_processing: VideoProcessingConfig
    execution: ExecutionConfig

# --- 4. Load and Validate Configuration ---
# We will create a default config here in the code to simplify things.
# You can also save this as /content/config.yaml if you prefer.
CONFIG_YAML = """
drive:
  output_directory: "My Drive/Kinetics-700-Clips"

video_processing:
  vcodec: "h264_nvenc"
  target_fps: 30
  encoding_params:
    cq: 23
    preset: "p6"

execution:
  max_workers: 6
  sleep_interval_seconds: 2
"""

print("\n--- Loading Configuration ---")
try:
    yaml_data = yaml.safe_load(CONFIG_YAML)
    cfg = MainConfig.model_validate(yaml_data)

    # Create the temporary local directory for clips before they are moved to Drive
    cfg.video_processing.run_output_dir.mkdir(parents=True, exist_ok=True)

    # Create the final output directory in Google Drive
    drive_output_path = Path("/content/drive") / cfg.drive.output_directory
    drive_output_path.mkdir(parents=True, exist_ok=True)

    print("✅ Configuration loaded and validated successfully.")
    print(f"   - Video Codec: {cfg.video_processing.vcodec}")
    print(f"   - Temp Clip Dir: {cfg.video_processing.run_output_dir}")
    print(f"   - Final Drive Dir: {drive_output_path}")
    print(f"   - Max Workers: {cfg.execution.max_workers}")


except Exception as e:
    print(f"❌ Error loading config: {e}", file=sys.stderr)
    import traceback
    traceback.print_exc()
    sys.exit(1)

print("\n✅ Cell 1 complete. Environment is ready for Kinetics-700 processing.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Google Drive mounted successfully.

--- Installing Python packages ---


ERROR:root:QFxYxUc2bTA_0: Processing failed. Reason: yt-dlp failed: ERROR: [youtube] QFxYxUc2bTA: Requested format is not available. Use --list-formats for a list of available formats
ERROR:root:ajX_uYAchD0_34: Processing failed. Reason: yt-dlp failed: ERROR: [youtube] ajX_uYAchD0: Requested format is not available. Use --list-formats for a list of available formats
ERROR:root:yFBiAIlr2d8_30: Processing failed. Reason: yt-dlp failed: ERROR: ffmpeg exited with code 1



--- Installing system FFmpeg (with NVENC) ---
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)

--- Verifying GPU and NVENC ---
Tesla T4, 550.54.15, 15360 MiB
✅ CUDA HWAccel found.
✅ NVENC encoders found.

--- Loading Configuration ---
✅ Configuration loaded and validated successfully.
   - Video Codec: h264_nvenc
   - Temp Clip Dir: /content/temp_clips
   - Final Drive Dir: /content/drive/My Drive/Kinetics-700-Clips
   - Max Workers: 6

✅ Cell 1 complete. Environment is ready for Kinetics-700 processing.


In [7]:
# CELL 2: KINETICS-700 DATASET LOADING AND SAMPLING
import pandas as pd
import sqlite3
import logging
from pathlib import Path
import sys

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# --- 1. Download the Official Kinetics-700 Annotations ---
# We will use the validation set as it's smaller and good for a 1000-clip sample.
KINETICS_URL = "https://storage.googleapis.com/deepmind-media/Datasets/kinetics700_2020.tar.gz"
TAR_FILE = Path("kinetics700_2020.tar.gz")
ANNOTATION_CSV = Path("kinetics700_2020/validate.csv")

print(f"--- Loading Kinetics-700 Dataset ---")
if not ANNOTATION_CSV.exists():
    logging.info(f"Downloading and extracting Kinetics-700 annotations from {KINETICS_URL}...")
    !wget -q -O {TAR_FILE} {KINETICS_URL}
    !tar -xf {TAR_FILE}
    logging.info("Annotations extracted successfully.")
else:
    logging.info("Kinetics-700 annotations already exist.")

# --- 2. Load, Sample, and Prepare the Data ---
try:
    df = pd.read_csv(ANNOTATION_CSV)
    logging.info(f"Loaded {len(df)} total clips from the validation set.")

    # Take a random sample of 1000 clips
    # Using a fixed random_state ensures the sample is the same every time you run the notebook.
    # NOTE: You can increase n=1000 to a larger number to get more final clips.
    master_df = df.sample(n=1000, random_state=42).copy()
    logging.info(f"Sampled 1000 random clips for processing.")

    # The Kinetics dataset uses 'label' for the class name, which is perfect.
    # We will create a unique ID for our database based on the youtube_id and start time.
    master_df['video_id'] = master_df['youtube_id'] + '_' + master_df['time_start'].astype(str)

except Exception as e:
    logging.error(f"Failed to load or sample the Kinetics CSV: {e}", exc_info=True)
    sys.exit(1)

# --- 3. Validate and Initialize Database ---
master_df.dropna(subset=['youtube_id', 'label', 'time_start', 'time_end'], inplace=True)
logging.info(f"After dropping nulls: {len(master_df)} clips remain.")
master_df.drop_duplicates('video_id', inplace=True)
logging.info(f"After removing duplicates: {len(master_df)} unique clips remain.")

# --- 4. Seed the Database with New Videos ---
DB_PATH = Path("/content/video_state.db")
with sqlite3.connect(str(DB_PATH)) as con:
    con.execute("CREATE TABLE IF NOT EXISTS videos (video_id TEXT PRIMARY KEY, label TEXT, status TEXT NOT NULL, drive_path TEXT, reason TEXT, updated_at TIMESTAMP)")
    cursor = con.cursor()
    cursor.execute("SELECT video_id FROM videos")
    existing_vids = {row[0] for row in cursor.fetchall()}
    to_add_df = master_df[~master_df['video_id'].isin(existing_vids)]
    if not to_add_df.empty:
        recs = to_add_df.assign(status='PENDING')[['video_id','label','status']]
        recs.to_sql('videos', con, if_exists='append', index=False)
        logging.info(f"DB seeded with {len(recs)} new clips.")
    else:
        logging.info("No new clips to add to the database.")

# --- 5. Prepare Final Queue for Processing ---
print("\n--- Preparing processing queue ---")
with sqlite3.connect(str(DB_PATH)) as con:
    pending_df = pd.read_sql("SELECT video_id FROM videos WHERE status='PENDING'", con)
if pending_df.empty:
    logging.info("✅ No clips with status 'PENDING' found. Nothing to process.")
    hq_df = pd.DataFrame()
else:
    # Merge to get all necessary columns (youtube_id, label, start, end) for the clips we need to process
    hq_df = master_df.merge(pending_df, on='video_id')
    logging.info(f"Found {len(hq_df)} clips in the queue to process.")

print(f"\n✅ Cell 2 complete. Videos queued for processing: {len(hq_df)}")

--- Loading Kinetics-700 Dataset ---

--- Preparing processing queue ---

✅ Cell 2 complete. Videos queued for processing: 771


In [8]:
# CELL 3: THE KINETICS CLIP PROCESSING ENGINE (TRIM, ENCODE, AND SAVE TO DRIVE)
import subprocess
import traceback
import sqlite3
from pathlib import Path
import logging
import shutil

DB_PATH = Path("/content/video_state.db")

def update_status(vid, status, drive_path=None, reason=None):
    """Updates the clip status in the central SQLite database."""
    with sqlite3.connect(str(DB_PATH), timeout=10) as con:
        con.execute("UPDATE videos SET status=?, drive_path=?, reason=?, updated_at=CURRENT_TIMESTAMP WHERE video_id=?",
                  (status, drive_path or '', str(reason) or '', vid))

def process_video_gpu(row, config):
    """
    Downloads a full YouTube video, uses FFmpeg to trim the specified clip,
    encodes it with the GPU, and moves the final clip to Google Drive.
    """
    # --- 1. Setup paths and variables ---
    # A unique ID for this specific clip
    clip_id = row.video_id
    # The original YouTube ID
    youtube_id = row.youtube_id
    # Sanitize the label to create a clean directory name
    label = "".join(c if c.isalnum() else '_' for c in row.label)

    # Define the final path in Google Drive
    drive_output_dir = Path("/content/drive") / config.drive.output_directory / label
    drive_output_dir.mkdir(parents=True, exist_ok=True) # Ensure the label directory exists
    final_drive_path = drive_output_dir / f"clip_{clip_id}.mp4"

    # Define a temporary path on the local Colab disk for processing
    temp_output_path = config.video_processing.run_output_dir / f"{clip_id}.mp4"

    youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"

    p_yt, p_ff = None, None
    try:
        # --- 2. Define yt-dlp and FFmpeg commands ---
        # UPDATED: Format string now looks for 480p or higher.
        yt_dlp_cmd = [
            'yt-dlp', '--quiet', '--no-warnings',
            # Get the best 480p-or-higher stream available.
            '-f', 'bestvideo[height>=480]+bestaudio/best[height>=480]',
            '-o', '-', youtube_url
        ]

        vp_cfg = config.video_processing
        start_time = row.time_start
        end_time = row.time_end

        # CRITICAL: The FFmpeg command now includes trimming flags.
        # -ss [start] -to [end] are placed AFTER the input for frame accuracy.
        ffmpeg_cmd = [
            'ffmpeg', '-y', '-hide_banner',
            '-i', 'pipe:0', # Input from yt-dlp pipe
            '-ss', str(start_time),
            '-to', str(end_time),
            '-max_muxing_queue_size', '1024',
            '-c:v', vp_cfg.vcodec,
            '-preset', vp_cfg.encoding_params.preset,
            '-cq', str(vp_cfg.encoding_params.cq),
            '-r', str(vp_cfg.target_fps),
            '-pix_fmt', 'yuv420p',
            '-c:a', 'aac',
            str(temp_output_path) # Output to the temporary local path
        ]

        logging.info(f"{clip_id}: Starting trim/encode: {start_time}s to {end_time}s")

        # --- 3. Execute the process pipe ---
        p_yt = subprocess.Popen(yt_dlp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        p_ff = subprocess.Popen(ffmpeg_cmd, stdin=p_yt.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p_yt.stdout: p_yt.stdout.close()

        ff_stdout, ff_stderr = p_ff.communicate(timeout=600) # 10-minute timeout
        yt_stdout, yt_stderr = p_yt.communicate()

        if p_yt.returncode != 0: raise RuntimeError(f"yt-dlp failed: {yt_stderr.decode().strip()}")
        if p_ff.returncode != 0: raise RuntimeError(f"FFmpeg failed: {ff_stderr.decode().strip()}")
        if not temp_output_path.exists() or temp_output_path.stat().st_size < 1024:
            raise RuntimeError(f"FFmpeg ran, but the output clip is missing or too small.")

        # --- 4. Move final clip to Google Drive ---
        logging.info(f"{clip_id}: Moving clip to Google Drive: {final_drive_path}")
        shutil.move(str(temp_output_path), str(final_drive_path))

        update_status(clip_id, 'SUCCESS', drive_path=str(final_drive_path))
        return True, None

    except Exception as e:
        error_reason = str(e).splitlines()[0]
        logging.error(f"{clip_id}: Processing failed. Reason: {error_reason}")
        update_status(clip_id, 'FAILURE', reason=error_reason)
        return False, error_reason
    finally:
        # Cleanup processes and temporary files
        if p_yt and p_yt.poll() is None: p_yt.kill()
        if p_ff and p_ff.poll() is None: p_ff.kill()
        if temp_output_path.exists():
            temp_output_path.unlink()

print("✅ Cell 3 complete. Clip processing engine is ready.")

✅ Cell 3 complete. Clip processing engine is ready.


In [None]:
# CELL 4: PARALLEL EXECUTION, MONITORING, AND VERIFICATION (DRIVE-READY)
import time
import random
import sys
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm
import pandas as pd

if 'hq_df' not in globals() or hq_df.empty:
    logging.warning("✅ No clips found in the queue. Nothing to do. Pipeline finished.")
else:
    success_count = 0
    failure_count = 0
    start_time = time.time()
    max_workers = cfg.execution.max_workers

    def run_process_wrapper(row):
        time.sleep(random.uniform(1, cfg.execution.sleep_interval_seconds))
        return process_video_gpu(row, cfg)

    print(f"--- Starting parallel processing of {len(hq_df)} clips using {max_workers} workers ---")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_clip = {executor.submit(run_process_wrapper, row): row.video_id for _, row in hq_df.iterrows()}
        with tqdm(total=len(hq_df), desc="Processing Clips", unit="clip") as pbar:
            for future in as_completed(future_to_clip):
                clip_id = future_to_clip[future]
                try:
                    ok, reason = future.result()
                    if ok: success_count += 1
                    else: failure_count += 1
                except Exception as e:
                    failure_count += 1
                    logging.critical(f"{clip_id}: An unexpected exception occurred: {e}", exc_info=True)
                    update_status(clip_id, 'FAILURE', reason=f"Executor-level exception: {e}")
                pbar.update(1)
                pbar.set_postfix(succeeded=success_count, failed=failure_count, refresh=True)

    end_time = time.time()
    total_processed = success_count + failure_count
    duration_minutes = (end_time - start_time) / 60 if (end_time - start_time) > 0 else 0
    rate = total_processed / duration_minutes if duration_minutes > 0 else 0
    print("\n--- PROCESSING COMPLETE ---")
    print(f"✅ Succeeded: {success_count}")
    print(f"❌ Failed:    {failure_count}")
    print(f"⏱️ Total processed: {total_processed} in {duration_minutes:.2f} minutes ({rate:.2f} clips/min)")

    # --- Verification step updated for Google Drive ---
    print("\n--- Verifying sample of successful clips in Google Drive ---")
    try:
        with sqlite3.connect(str(DB_PATH)) as con:
            verified_df = pd.read_sql("SELECT drive_path FROM videos WHERE status='SUCCESS' AND drive_path != '' LIMIT 5", con)
        if verified_df.empty:
            print("⚠️ No successful clips found in the database to verify.")
        else:
            print(f"Checking for {len(verified_df)} file(s) in Google Drive...")
            verified_count = 0
            for drive_path_str in verified_df['drive_path']:
                if Path(drive_path_str).exists():
                    print(f"  ✅ Found: {drive_path_str}")
                    verified_count += 1
                else:
                    print(f"  ❌ FAILED TO FIND: {drive_path_str}")
            print(f"\nSuccessfully verified {verified_count}/{len(verified_df)} sample files.")
    except Exception as e:
        print(f"❌ Could not perform Google Drive verification: {e}")

    print("\n✅ Pipeline Finished.")

--- Starting parallel processing of 771 clips using 6 workers ---


Processing Clips:   0%|          | 0/771 [00:00<?, ?clip/s]

ERROR:root:9BG75zyN-C4_29: Processing failed. Reason: yt-dlp failed: ERROR: ffmpeg exited with code 1
ERROR:root:QlYqu2EmouU_15: Processing failed. Reason: yt-dlp failed: ERROR: [youtube] QlYqu2EmouU: Requested format is not available. Use --list-formats for a list of available formats
ERROR:root:WW6wDRNon9A_21: Processing failed. Reason: yt-dlp failed: ERROR: [youtube] WW6wDRNon9A: Requested format is not available. Use --list-formats for a list of available formats
ERROR:root:ZwlQcUa8kdI_96: Processing failed. Reason: yt-dlp failed: ERROR: ffmpeg exited with code 1
ERROR:root:QlYqu2EmouU_15: Processing failed. Reason: yt-dlp failed: ERROR: [youtube] QlYqu2EmouU: Requested format is not available. Use --list-formats for a list of available formats
