# Extract Video FPS from YouTube URLs

This notebook processes JSON files from the media-info directory, extracts YouTube URLs,
gets the FPS of each video, and saves the FPS back to the JSON files.

In [1]:
import os
import json
import yt_dlp
from pathlib import Path
from tqdm import tqdm
import time
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
# Configuration
MEDIA_INFO_DIR = r"frame-ui\public\media-info-aic25-b1\media-info"
OUTPUT_LOG_FILE = "fps_extraction_log.txt"
ERROR_LOG_FILE = "fps_extraction_errors.txt"

# Verify directory exists
if not os.path.exists(MEDIA_INFO_DIR):
    print(f"Directory not found: {MEDIA_INFO_DIR}")
    print("Please check the path and make sure it exists.")
else:
    print(f"Media info directory found: {MEDIA_INFO_DIR}")
    json_files = [f for f in os.listdir(MEDIA_INFO_DIR) if f.endswith('.json')]
    print(f"Found {len(json_files)} JSON files to process")

Media info directory found: frame-ui\public\media-info-aic25-b1\media-info
Found 873 JSON files to process


In [3]:
def get_video_fps(youtube_url, max_retries=3):
    """
    Extract FPS from YouTube video URL using yt-dlp
    
    Args:
        youtube_url (str): YouTube video URL
        max_retries (int): Maximum number of retry attempts
    
    Returns:
        float: FPS value or None if extraction fails
    """
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'extractaudio': False,
        'format': 'best[height<=360]',  # Get reasonable quality to avoid rate limits
    }
    
    for attempt in range(max_retries):
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(youtube_url, download=False)
                fps = info.get('fps')
                
                if fps is None:
                    # Try to get fps from formats
                    formats = info.get('formats', [])
                    for format_info in formats:
                        if format_info.get('fps'):
                            fps = format_info['fps']
                            break
                
                if fps:
                    logger.info(f"Successfully extracted FPS: {fps} from {youtube_url}")
                    return fps
                else:
                    logger.warning(f"No FPS information found for {youtube_url}")
                    return None
                    
        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed for {youtube_url}: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                logger.error(f"All attempts failed for {youtube_url}")
                return None
    
    return None

In [4]:
def process_json_file(filepath):
    """
    Process a single JSON file to extract and add FPS information
    
    Args:
        filepath (str): Path to the JSON file
    
    Returns:
        tuple: (success: bool, message: str)
    """
    try:
        # Read JSON file
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # Check if FPS already exists
        if 'fps' in data and data['fps'] is not None:
            return True, f"FPS already exists: {data['fps']}"
        
        # Get watch_url
        watch_url = data.get('watch_url')
        if not watch_url:
            return False, "No watch_url found in JSON"
        
        # Extract FPS
        fps = get_video_fps(watch_url)
        
        if fps is not None:
            # Add FPS to data
            data['fps'] = fps
            
            # Write back to file
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=4)
            
            return True, f"Successfully added FPS: {fps}"
        else:
            return False, "Failed to extract FPS"
            
    except Exception as e:
        return False, f"Error processing file: {str(e)}"

In [5]:
# Main processing loop
def process_all_files():
    """
    Process all JSON files in the media-info directory
    """
    json_files = [f for f in os.listdir(MEDIA_INFO_DIR) if f.endswith('.json')]
    
    success_count = 0
    error_count = 0
    already_processed_count = 0
    
    # Create log files
    with open(OUTPUT_LOG_FILE, 'w', encoding='utf-8') as log_f, \
         open(ERROR_LOG_FILE, 'w', encoding='utf-8') as error_f:
        
        log_f.write(f"FPS Extraction Log - Started processing {len(json_files)} files\n")
        log_f.write("=" * 60 + "\n")
        
        error_f.write(f"FPS Extraction Errors - Processing {len(json_files)} files\n")
        error_f.write("=" * 60 + "\n")
        
        # Process each file with progress bar
        for filename in tqdm(json_files, desc="Processing JSON files"):
            filepath = os.path.join(MEDIA_INFO_DIR, filename)
            
            success, message = process_json_file(filepath)
            
            if success:
                if "already exists" in message:
                    already_processed_count += 1
                else:
                    success_count += 1
                log_f.write(f"✓ {filename}: {message}\n")
            else:
                error_count += 1
                error_f.write(f"✗ {filename}: {message}\n")
            
            # Flush logs periodically
            if (success_count + error_count + already_processed_count) % 10 == 0:
                log_f.flush()
                error_f.flush()
            
            # Add small delay to avoid rate limiting
            time.sleep(0.5)
    
    # Summary
    total_processed = success_count + error_count + already_processed_count
    print(f"\n" + "=" * 60)
    print(f"PROCESSING COMPLETE")
    print(f"=" * 60)
    print(f"Total files: {len(json_files)}")
    print(f"Successfully processed: {success_count}")
    print(f"Already had FPS: {already_processed_count}")
    print(f"Errors: {error_count}")
    print(f"\nLogs saved to:")
    print(f"- Success log: {OUTPUT_LOG_FILE}")
    print(f"- Error log: {ERROR_LOG_FILE}")
    
    return success_count, error_count, already_processed_count

In [6]:
# Test with a single file first
def test_single_file():
    """
    Test the processing with a single file
    """
    json_files = [f for f in os.listdir(MEDIA_INFO_DIR) if f.endswith('.json')]
    if not json_files:
        print("No JSON files found!")
        return
    
    test_file = json_files[0]
    filepath = os.path.join(MEDIA_INFO_DIR, test_file)
    
    print(f"Testing with file: {test_file}")
    
    # Read and display current content
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    print(f"Current watch_url: {data.get('watch_url')}")
    print(f"Current fps: {data.get('fps', 'Not set')}")
    
    success, message = process_json_file(filepath)
    print(f"Result: {message}")
    
    # Read updated content
    with open(filepath, 'r', encoding='utf-8') as f:
        updated_data = json.load(f)
    
    print(f"Updated fps: {updated_data.get('fps', 'Still not set')}")

# Uncomment the next line to test with a single file first
# test_single_file()

In [9]:
test_single_file()

Testing with file: L21_V001.json
Current watch_url: https://youtube.com/watch?v=Rzpw5WR7nAY
Current fps: Not set


INFO:__main__:Successfully extracted FPS: 30 from https://youtube.com/watch?v=Rzpw5WR7nAY


Result: Successfully added FPS: 30
Updated fps: 30


In [6]:
process_all_files()

Processing JSON files:  18%|█▊        | 157/873 [01:25<06:22,  1.87it/s]INFO:__main__:Successfully extracted FPS: 30 from https://youtube.com/watch?v=4FCtYRiLuho
Processing JSON files:  18%|█▊        | 158/873 [02:05<2:29:21, 12.53s/it]INFO:__main__:Successfully extracted FPS: 30 from https://youtube.com/watch?v=qFZv_xplTlo
Processing JSON files:  18%|█▊        | 159/873 [02:37<3:36:29, 18.19s/it]INFO:__main__:Successfully extracted FPS: 25 from https://youtube.com/watch?v=hgH4k9QFoqY
Processing JSON files:  18%|█▊        | 160/873 [03:11<4:35:04, 23.15s/it]INFO:__main__:Successfully extracted FPS: 25 from https://youtube.com/watch?v=aFxEctSNt6A
Processing JSON files:  18%|█▊        | 161/873 [03:47<5:19:32, 26.93s/it]INFO:__main__:Successfully extracted FPS: 30 from https://youtube.com/watch?v=DGGWhygiKfM
Processing JSON files:  19%|█▊        | 162/873 [04:30<6:14:09, 31.57s/it]INFO:__main__:Successfully extracted FPS: 25 from https://youtube.com/watch?v=zT5rGIkxcqU
Processing JSON fi


PROCESSING COMPLETE
Total files: 873
Successfully processed: 692
Already had FPS: 157
Errors: 24

Logs saved to:
- Success log: fps_extraction_log.txt
- Error log: fps_extraction_errors.txt





(692, 24, 157)

In [8]:
# Utility function to check progress
def check_progress():
    """
    Check how many files already have FPS information
    """
    json_files = [f for f in os.listdir(MEDIA_INFO_DIR) if f.endswith('.json')]
    
    files_with_fps = 0
    files_without_fps = 0
    
    for filename in json_files:
        filepath = os.path.join(MEDIA_INFO_DIR, filename)
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            if 'fps' in data and data['fps'] is not None:
                files_with_fps += 1
            else:
                files_without_fps += 1
        except Exception as e:
            print(f"Error reading {filename}: {e}")
    
    print(f"Progress Check:")
    print(f"Total files: {len(json_files)}")
    print(f"Files with FPS: {files_with_fps}")
    print(f"Files without FPS: {files_without_fps}")
    print(f"Progress: {files_with_fps/len(json_files)*100:.1f}%")
    
    return files_with_fps, files_without_fps

# Check current progress
check_progress()

Progress Check:
Total files: 873
Files with FPS: 0
Files without FPS: 873
Progress: 0.0%


(0, 873)