In [None]:
# 1.


import os
import json
import time
import random
import zipfile
import pandas as pd
from tqdm.notebook import tqdm
import subprocess

# --- CONFIGURATION ---
OUTPUT_DIR = "/kaggle/working/msasl_subset"
VIDEO_DIR = os.path.join(OUTPUT_DIR, "videos")
os.makedirs(VIDEO_DIR, exist_ok=True)

# The 55 Target Classes for SignBridge Demo
TARGET_GLOSSES = [
    # Old Classes (25)
    "need", "who", "no", "like", "work", "before", "go", "time", "later", "fine", 
    "eat", "want", "woman", "drink", "help", "yes", "hot", "mother", "now", "what", 
    "family", "man", "pizza", "school", "how",
    # New Classes (30)
    "i", "tomorrow", "happy", "father", "friend", "your", "water", "why", "home", "stay", 
    "cold", "they", "sick", "good", "know", "you", "day", "where", "tired", "please", 
    "when", "hello", "sorry", "my", "ok", "hungry", "see", "we", "he", "have"
]

print(f"Targeting {len(TARGET_GLOSSES)} classes.")

In [None]:
# 2. Setup MS-ASL Metadata Paths
# We use the pre-uploaded Kaggle dataset instead of downloading from GitHub
MSASL_DIR = "/kaggle/input/ms-asl/MS-ASL"

TRAIN_JSON = os.path.join(MSASL_DIR, "MSASL_train.json")
VAL_JSON = os.path.join(MSASL_DIR, "MSASL_val.json")
TEST_JSON = os.path.join(MSASL_DIR, "MSASL_test.json")

# Verify existence
for p in [TRAIN_JSON, VAL_JSON, TEST_JSON]:
    if not os.path.exists(p):
        print(f"⚠️ Warning: Could not find {p}")
        print("Please ensure you have added the 'ms-asl' dataset to your notebook.")
    else:
        print(f"✅ Found {os.path.basename(p)}")

In [None]:
# 3. Load and Filter Data
def load_and_filter(json_path, split_name):
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    filtered = []
    for entry in data:
        # Normalize gloss text (lowercase, strip)
        gloss = entry['clean_text'].lower().strip()
        
        if gloss in TARGET_GLOSSES:
            # Add split info
            entry['split'] = split_name
            # Create a safe filename
            safe_gloss = gloss.replace(" ", "_")
            # MS-ASL format: org_video_id + start_time + end_time
            # We'll make a unique filename
            filename = f"{safe_gloss}_{entry['org_text']}_{entry['start_time']}_{entry['end_time']}.mp4"
            # Clean filename of weird chars
            filename = "".join([c for c in filename if c.isalnum() or c in ('_', '.', '-')])
            entry['filename'] = filename
            filtered.append(entry)
            
    return filtered

# Use the paths defined in the previous cell
train_data = load_and_filter(TRAIN_JSON, 'train')
val_data = load_and_filter(VAL_JSON, 'val')
test_data = load_and_filter(TEST_JSON, 'test')

all_samples = train_data + val_data + test_data
print(f"Found {len(all_samples)} total samples for our 55 classes.")

# Save the filtered manifest
manifest_path = os.path.join(OUTPUT_DIR, "msasl_subset.json")
with open(manifest_path, 'w') as f:
    json.dump(all_samples, f, indent=2)
    
# Show distribution
df = pd.DataFrame(all_samples)
print("Top 10 classes by count:")
print(df['clean_text'].value_counts().head(10))
print("Bottom 10 classes by count:")
print(df['clean_text'].value_counts().tail(10))

In [None]:
# 4. Download Function
def download_clip(url, start_time, end_time, output_path):
    """
    Downloads a specific section of a YouTube video using yt-dlp.
    """
    if os.path.exists(output_path):
        return True # Already exists
        
    # yt-dlp command to download specific section
    # --download-sections "*start-end"
    # -f best[ext=mp4] to get mp4
    
    cmd = [
        "yt-dlp",
        "--quiet",
        "--no-warnings",
        "--force-keyframes-at-cuts", # Crucial for accurate trimming
        "--download-sections", f"*{start_time}-{end_time}",
        "-f", "best[ext=mp4]/best",
        "-o", output_path,
        url
    ]
    
    try:
        subprocess.run(cmd, check=True, timeout=60) # 60s timeout per clip
        return True
    except subprocess.CalledProcessError:
        return False
    except subprocess.TimeoutExpired:
        return False
    except Exception as e:
        return False


In [None]:
# 5. Run Download Loop (Parallel)
import concurrent.futures

# We shuffle to avoid hitting the same channel sequentially (helps avoid some blocks)
random.shuffle(all_samples)

success_count = 0
fail_count = 0

# Kaggle typically provides 4 vCPUs. 
# We set max_workers to 4 to maximize throughput.
MAX_WORKERS = 4 

print(f"Starting download of {len(all_samples)} clips using {MAX_WORKERS} workers...")
print("This may take a while. Grab a coffee ☕")

def process_sample(sample):
    url = sample['url']
    start = sample['start_time']
    end = sample['end_time']
    path = os.path.join(VIDEO_DIR, sample['filename'])
    
    # Skip if already downloaded
    if os.path.exists(path):
        return True
        
    # Attempt download
    return download_clip(url, start, end, path)

with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # Submit all tasks
    futures = {executor.submit(process_sample, sample): sample for sample in all_samples}
    
    # Use tqdm to track progress as tasks complete
    pbar = tqdm(concurrent.futures.as_completed(futures), total=len(all_samples))
    
    for future in pbar:
        try:
            result = future.result()
            if result:
                success_count += 1
            else:
                fail_count += 1
        except Exception:
            fail_count += 1
            
        pbar.set_description(f"Success: {success_count} | Fail: {fail_count}")

print(f"Download complete.")
print(f"Successfully downloaded: {success_count}")
print(f"Failed (deleted/private/blocked): {fail_count}")

In [None]:
# 6. Zip Output for Export
# This makes it easy to download or create a dataset
import shutil

print("Zipping dataset...")
shutil.make_archive("/kaggle/working/msasl_55_subset", 'zip', OUTPUT_DIR)
print("Done! You can now find 'msasl_55_subset.zip' in the output files.")