# WLASL300 Preprocessing

The Word-Level American Sign Language (WLASL) dataset consists of approximately 12,000 videos of around 2,000 common words.

For preprocessing this dataset, we use MediaPipe for hand landmark extraction from videos.

## Set-Up

### Install Dependencies

In [None]:
# Install required packages with compatible versions
# Note: Colab comes with TensorFlow pre-installed, but we'll ensure compatibility

print("Installing dependencies...")
print("="*60)

# Update pip, setuptools, and wheel for better dependency resolution
!pip install -q --upgrade pip setuptools wheel

# Uninstall potentially conflicting packages first
# Use --ignore-installed to handle cases where packages are partially installed or have issues
!pip uninstall -y numpy mediapipe protobuf opencv-python opencv-python-headless scikit-learn

# Install core dependencies first, in separate steps to isolate potential build issues
!pip install -q numpy==1.26.4       # Explicitly set to a version compatible with TensorFlow 2.x
!pip install -q protobuf==4.25.3    # Updated for compatibility with tensorflow-metadata on Python 3.12

# Then install other dependencies
!pip install -q mediapipe==0.10.21  # Updated to a valid and recent version
!pip install -q opencv-python-headless==4.8.1.78 # Compatible with numpy 1.26.4
!pip install -q scikit-learn==1.3.2 # Ensure scikit-learn is also explicitly installed

# Matplotlib for plotting (usually pre-installed)
!pip install -q matplotlib

print("\n✓ All packages installed successfully!")
print("\nVerifying installations...")

# Verify installations
import sys
import mediapipe as mp
import cv2
import sklearn
import tensorflow as tf
import numpy as np

print(f"  Python version: {sys.version.split()[0]}")
print(f"  TensorFlow: {tf.__version__}")
print(f"  MediaPipe: {mp.__version__}")
print(f"  OpenCV: {cv2.__version__}")
print(f"  Scikit-learn: {sklearn.__version__}")
print(f"  NumPy: {np.__version__}")

print("\n" + "="*60)
print("✓ All dependencies verified and compatible!")
print("="*60)

In [None]:
import sys
import mediapipe as mp
import cv2
import sklearn
import tensorflow as tf
import numpy as np

print("="*60)
print("Verifying installations...")
print(f"  Python version: {sys.version.split()[0]}")
print(f"  TensorFlow: {tf.__version__}")
print(f"  MediaPipe: {mp.__version__}")
print(f"  OpenCV: {cv2.__version__}")
print(f"  Scikit-learn: {sklearn.__version__}")
print(f"  NumPy: {np.__version__}") # Should be 1.26.4

print("="*60)

In [None]:
# Download MediaPipe Hand Landmarker model
import urllib.request
import os

model_url = "https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/1/hand_landmarker.task"
model_path = "hand_landmarker.task"

if not os.path.exists(model_path):
    print("Downloading MediaPipe Hand Landmarker model...")
    try:
        urllib.request.urlretrieve(model_url, model_path)
        print(f"Model downloaded to {model_path}")
    except Exception as e:
        print(f"Error downloading model: {e}")
        print("Trying alternative download method...")
        !wget -q {model_url} -O {model_path}
        print(f"Model downloaded to {model_path}")
else:
    print(f"Model already exists at {model_path}")

### Mount Google Drive and Extract Dataset

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

print("Google Drive mounted successfully!")

In [None]:
import zipfile
import os

# Path to your dataset in Google Drive
DRIVE_ZIP_PATH = "/content/drive/MyDrive/WLASL Dataset/archive.zip"
EXTRACT_PATH = "/content/wlasl_data"

# Check if the zip file exists
if not os.path.exists(DRIVE_ZIP_PATH):
    raise FileNotFoundError(f"Dataset not found at: {DRIVE_ZIP_PATH}\n"
                          f"Please ensure 'archive.zip' is in 'WLASL Dataset' folder in your Google Drive.")

print(f"Found dataset at: {DRIVE_ZIP_PATH}")

# Extract the dataset
print(f"\nExtracting dataset to {EXTRACT_PATH}...")
os.makedirs(EXTRACT_PATH, exist_ok=True)

with zipfile.ZipFile(DRIVE_ZIP_PATH, 'r') as zip_ref:
    zip_ref.extractall(EXTRACT_PATH)

print("Dataset extracted successfully!")

# List the contents to understand the structure
print("\nDataset structure:")
for root, dirs, files in os.walk(EXTRACT_PATH):
    level = root.replace(EXTRACT_PATH, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f'{indent}{os.path.basename(root)}/')
    sub_indent = ' ' * 2 * (level + 1)
    for file in files[:5]:  # Show first 5 files in each directory
        print(f'{sub_indent}{file}')
    if len(files) > 5:
        print(f'{sub_indent}... and {len(files) - 5} more files')
    if level > 2:  # Limit depth to avoid too much output
        break

### Configuration

In [None]:
# Dataset paths - WLASL300 structure
LABELS_FILE = "/content/wlasl_data/labels.txt"  # Maps label IDs to gloss names
VIDEO_ROOT = "/content/wlasl_data/WLASL300"     # Root folder containing numbered subfolders

# Output paths
OUTPUT_NPZ = "wlasl_landmarks.npz"
MODEL_OUTPUT = "wlasl_sequence_model.keras"
LABELS_OUTPUT = "wlasl_labels.npy"

# Processing parameters
SEQUENCE_LENGTH = 32  # Fixed number of frames per sequence
FRAME_STRIDE = 2      # Sample every Nth frame
MIN_FRAMES = 8        # Discard sequences shorter than this

# Gloss selection
# Option 1: Specify label IDs (1-300) you want to train on
SELECTED_LABEL_IDS = None  # e.g., [1, 5, 10, 25, 50] or None for all

# Option 2: If None, use the first N glosses
MAX_GLOSSES = 50              # Number of glosses to use if SELECTED_LABEL_IDS is None
MAX_SAMPLES_PER_GLOSS = None  # Limit samples per gloss (None = no limit)
MIN_VIDEOS_PER_GLOSS = 3      # Minimum videos required per gloss

# Training parameters
TEST_SIZE = 0.2
EPOCHS = 80
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
LSTM_UNITS = [128, 64]
DENSE_UNITS = 64
DROPOUT = 0.5
PATIENCE = 10                 # Early stopping patience

print("Configuration loaded")
print(f"\nLooking for:")
print(f"  Labels: {LABELS_FILE}")
print(f"  Videos: {VIDEO_ROOT}")

In [None]:
# Verify that the paths exist
import os

print("Verifying dataset paths...\n")

# Check labels file
if os.path.exists(LABELS_FILE):
    print(f"Found labels file: {LABELS_FILE}")
    with open(LABELS_FILE, 'r') as f:
        num_lines = sum(1 for _ in f)
    print(f"  Number of glosses: {num_lines}")
else:
    print(f"Labels file NOT found at: {LABELS_FILE}")
    print("\nSearching for labels.txt...")
    for root, dirs, files in os.walk("/content/wlasl_data"):
        if 'labels.txt' in files:
            print(f"  Found: {os.path.join(root, 'labels.txt')}")
    print("\nUpdate LABELS_FILE in the configuration cell above")

# Check video directory
print()
if os.path.exists(VIDEO_ROOT):
    print(f"Video directory found: {VIDEO_ROOT}")
    subdirs = [d for d in os.listdir(VIDEO_ROOT) if os.path.isdir(os.path.join(VIDEO_ROOT, d)) and d.isdigit()]
    print(f"  Number of gloss folders: {len(subdirs)}")
    if subdirs:
        sample_dir = subdirs[0]
        sample_path = os.path.join(VIDEO_ROOT, sample_dir)
        videos = [f for f in os.listdir(sample_path) if f.endswith('.mp4')]
        print(f"  Sample folder '{sample_dir}' has {len(videos)} videos")
else:
    print(f"Video directory NOT found at: {VIDEO_ROOT}")
    print("\nSearching for WLASL300 folder...")
    for root, dirs, files in os.walk("/content/wlasl_data"):
        if 'WLASL300' in dirs:
            print(f"  Found: {os.path.join(root, 'WLASL300')}")
    print("\nUpdate VIDEO_ROOT in the configuration cell above")

print("\n" + "="*60)
if os.path.exists(LABELS_FILE) and os.path.exists(VIDEO_ROOT):
    print("All paths verified! You can proceed to the next steps.")
else:
    print("Please update the configuration paths based on the information above.")
print("="*60)

### Import Libraries and Utility Functions

In [None]:
# Import all required libraries
print("Importing libraries...")

import json
import os
import sys
from collections import Counter
from typing import List, Sequence
import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Constants for Holistic
# Pose (33 landmarks * 4 values [x,y,z,vis]) + Left Hand (21*3) + Right Hand (21*3)
# 132 + 63 + 63 = 258
FEATURE_VECTOR_LEN = 258

print("\n" + "="*60)
print("All libraries imported successfully!")
print(f"Feature Vector Length set to: {FEATURE_VECTOR_LEN}")
print("="*60)

In [None]:
# Normalization utility function (from preprocessing_utils.py)
def normalize_per_hand(X_arr: np.ndarray) -> np.ndarray:
    """
    Normalize landmarks per hand: translate by wrist, scale by wrist->middle_mcp distance.
    This ensures consistent spatial representation regardless of hand position/size in frame.
    """
    Xn = X_arr.copy()
    if Xn.ndim == 1:
        Xn = Xn.reshape(1, -1)
    Xn = Xn.reshape(-1, NUM_HANDS, NUM_LANDMARKS, COORDS)

    for i in range(Xn.shape[0]):
        for h in range(NUM_HANDS):
            hand = Xn[i, h]
            # Skip if hand is all zeros (no detection)
            if np.allclose(hand, 0.0):
                continue

            # Translate: center on wrist
            wrist = hand[WRIST_IDX]
            hand[:, :2] -= wrist[:2]  # translate x,y; keep z as-is

            # Scale: normalize by wrist->middle_mcp distance on xy plane
            ref = hand[MIDDLE_MCP_IDX]
            scale = np.linalg.norm(ref[:2])
            if scale > 1e-6:
                hand[:, :2] /= scale

            Xn[i, h] = hand

    return Xn.reshape(-1, FEATURE_VECTOR_LEN)

print("Normalization function defined")

## Preprocessing

### Define Preprocessing Functions

These functions handle:
- Loading WLASL metadata
- Selecting glosses (words) to train on
- Building MediaPipe hand detector
- Extracting landmarks from video frames
- Processing videos into fixed-length sequences

In [None]:
mp_holistic = mp.solutions.holistic

def load_labels(labels_path: str) -> dict:
    """Load labels.txt file."""
    label_map = {}
    with open(labels_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                parts = line.split(maxsplit=1)
                if len(parts) == 2:
                    label_map[int(parts[0])] = parts[1]
    return label_map

def scan_dataset(video_root: str, label_map: dict, selected_ids: List[int] = None,
                 max_glosses: int = 50, min_videos: int = 3) -> dict:
    """Scan dataset for valid videos."""
    glosses_data = {}
    ids_to_process = selected_ids if selected_ids else sorted(label_map.keys())[:max_glosses]

    for label_id in ids_to_process:
        if label_id not in label_map: continue

        gloss_name = label_map[label_id]
        folder_path = os.path.join(video_root, str(label_id))

        if not os.path.exists(folder_path): continue

        video_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.mp4')]

        if len(video_files) >= min_videos:
            glosses_data[gloss_name] = [(label_id, vf) for vf in video_files]

    return glosses_data

def build_detector(task_path: str):
    """Initialize MediaPipe Holistic."""
    # task_path is unused here but kept for compatibility
    return mp_holistic.Holistic(
        static_image_mode=False,
        model_complexity=1,
        smooth_landmarks=True,
        enable_segmentation=False,
        refine_face_landmarks=False
    )

def extract_landmarks(detector, frame_rgb: np.ndarray) -> np.ndarray:
    """
    Extract Pose + Left Hand + Right Hand landmarks.
    """
    results = detector.process(frame_rgb)

    # 1. Extract Pose (33 landmarks * 4 values: x, y, z, visibility)
    if results.pose_landmarks:
        pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten()
    else:
        pose = np.zeros(33 * 4)

    # 2. Extract Left Hand (21 landmarks * 3 values: x, y, z)
    if results.left_hand_landmarks:
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten()
    else:
        lh = np.zeros(21 * 3)

    # 3. Extract Right Hand (21 landmarks * 3 values: x, y, z)
    if results.right_hand_landmarks:
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten()
    else:
        rh = np.zeros(21 * 3)

    # Concatenate all features
    return np.concatenate([pose, lh, rh]).astype(np.float32)

def process_video(detector, video_path: str, sequence_length: int, frame_stride: int) -> np.ndarray:
    """Process video using Holistic detector."""
    capture = cv2.VideoCapture(video_path)
    if not capture.isOpened():
        raise RuntimeError(f"Could not open video '{video_path}'")

    total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
    sampled = []
    frame_index = 0

    while frame_index < total_frames:
        success, frame_bgr = capture.read()
        if not success: break

        if frame_index % frame_stride == 0:
            frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
            # Extract holistic landmarks
            features = extract_landmarks(detector, frame_rgb)
            sampled.append(features)

        frame_index += 1
        if len(sampled) >= sequence_length: break

    capture.release()

    if not sampled:
        return np.empty((0, FEATURE_VECTOR_LEN), dtype=np.float32)

    sequence = np.stack(sampled)

    # Pad or Truncate
    if sequence.shape[0] >= sequence_length:
        return sequence[:sequence_length]

    padding = np.zeros((sequence_length - sequence.shape[0], FEATURE_VECTOR_LEN), dtype=np.float32)
    return np.vstack((sequence, padding))

print("Holistic Preprocessing functions defined")

### Run Preprocessing

In this section:
1. Load WLASL metadata
2. Select glosses to train on
3. Process each video to extract landmarks
4. Save results to `wlasl_landmarks.npz`

In [None]:
# Temporarily redefine paths for this cell's execution to resolve FileNotFoundError
LABELS_FILE = "/content/wlasl_data/wlasl300_dataset/labels.txt"
VIDEO_ROOT = "/content/wlasl_data/wlasl300_dataset/WLASL300"

# Load labels
print("Loading label mappings...")
label_map = load_labels(LABELS_FILE)
print(f"✓ Loaded {len(label_map)} label mappings")

# Scan dataset
print("\nScanning dataset...")
glosses_data = scan_dataset(
    VIDEO_ROOT,
    label_map,
    SELECTED_LABEL_IDS,
    MAX_GLOSSES,
    MIN_VIDEOS_PER_GLOSS
)
print(f"✓ Found {len(glosses_data)} glosses meeting criteria")
print(f"Glosses: {sorted(glosses_data.keys())}")

# Count total videos
total_videos = sum(len(videos) for videos in glosses_data.values())
print(f"Total videos to process: {total_videos}")

# Build detector
print("\nInitializing MediaPipe hand detector...")
detector = build_detector("hand_landmarker.task")
print("✓ Detector ready")

# Process videos
print("\nProcessing videos...")
sequences = []
labels = []
video_ids = []
per_gloss_counts = Counter()

total_processed = 0
total_skipped = 0

for gloss_name, video_list in glosses_data.items():
    for label_id, video_path in video_list:
        # Check if we've hit the per-gloss limit
        if MAX_SAMPLES_PER_GLOSS and per_gloss_counts[gloss_name] >= MAX_SAMPLES_PER_GLOSS:
            continue

        try:
            sequence = process_video(
                detector=detector,
                video_path=video_path,
                sequence_length=SEQUENCE_LENGTH,
                frame_stride=FRAME_STRIDE,
            )
        except Exception as exc:
            print(f"⚠ Failed to process {video_path}: {exc}")
            total_skipped += 1
            continue

        # Check if sequence has enough valid frames
        actual_length = np.count_nonzero(np.linalg.norm(sequence, axis=1))
        if actual_length < MIN_FRAMES:
            total_skipped += 1
            continue

        sequences.append(sequence)
        labels.append(gloss_name)
        video_ids.append(os.path.basename(video_path))
        per_gloss_counts[gloss_name] += 1
        total_processed += 1

        # Progress update
        if total_processed % 50 == 0:
            print(f"  Processed {total_processed}/{total_videos} videos...")

print(f"\n✓ Processing complete!")
print(f"  Total processed: {total_processed}")
print(f"  Total skipped: {total_skipped}")

# Save to NPZ
if not sequences:
    raise RuntimeError("No sequences were extracted. Check your paths and dataset.")

X = np.stack(sequences).astype(np.float32)
y = np.array(labels)
vids = np.array(video_ids)

glosses_to_keep = sorted(glosses_data.keys())
np.savez(OUTPUT_NPZ, sequences=X, labels=y, video_ids=vids, glosses=np.array(glosses_to_keep))

print(f"\n✓ Saved {X.shape[0]} samples to {OUTPUT_NPZ}")
print(f"  Sequence shape: {X.shape}")
print("\n  Sample distribution:")
for gloss in sorted(per_gloss_counts):
    print(f"    {gloss}: {per_gloss_counts[gloss]}")

In [None]:
# Clean up to free memory
import gc

print("\nCleaning up memory...")
del detector  # Remove detector object
gc.collect()  # Force garbage collection

print("Memory cleanup complete")
print("\nYou can now proceed to training!")

### Backup Preprocessed Data

In [None]:
# Copy preprocessed data to Google Drive for safekeeping
import shutil

BACKUP_PATH = "/content/drive/MyDrive/WLASL Dataset/wlasl_landmarks.npz"

try:
    shutil.copy(OUTPUT_NPZ, BACKUP_PATH)
    print(f"Backup saved to: {BACKUP_PATH}")
    print("\nIf your session disconnects, you can restore by running:")
    print(f"  !cp '{BACKUP_PATH}' '{OUTPUT_NPZ}'")
except Exception as e:
    print(f"Could not backup to Drive: {e}")
    print("  Continuing with local copy only...")