Upgrades pip and installs a specific version of MediaPipe.

In [None]:
!pip install --upgrade pip
!pip install mediapipe==0.10.7

Downloads the heavy pose landmark model from MediaPipe.

In [36]:
!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

Downloads the hand landmark model from MediaPipe.

In [37]:
!wget -q https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/1/hand_landmarker.task

Imports MediaPipe, OpenCV, NumPy, and other libraries for processing video and extracting body and hand landmarks.

In [38]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import pandas as pd
import os
import random

Mounts Google Drive and sets up tools to display videos in Colab.

In [39]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Landmarks application

Load MediaPipe Hand and Pose Landmarker

In [40]:
# Load base options for the hand landmark model from the .task file
base_options_hand = python.BaseOptions(model_asset_path='hand_landmarker.task')

# Configure the hand landmark detector to detect up to 2 hands
options_hand = vision.HandLandmarkerOptions(
    base_options=base_options_hand,
    num_hands=2
)

# Create the hand landmark detector instance
hand_landmarker = vision.HandLandmarker.create_from_options(options_hand)

# Load base options for the pose landmark model from the .task file
base_options_pose = python.BaseOptions(model_asset_path='pose_landmarker.task')

# Configure the pose landmark detector, enabling segmentation masks as well
options_pose = vision.PoseLandmarkerOptions(
    base_options=base_options_pose,
    output_segmentation_masks=True
)

# Create the pose landmark detector instance
pose_landmarker = vision.PoseLandmarker.create_from_options(options_pose)

Expected total columns (x, y for each landmark)

In [41]:
LEFT_HAND_LANDMARKS = 21
RIGHT_HAND_LANDMARKS = 21
POSE_LANDMARKS = 33
TOTAL_COLUMNS = (LEFT_HAND_LANDMARKS + RIGHT_HAND_LANDMARKS + POSE_LANDMARKS) * 2

A function that extracts and saves normalized hand and pose landmarks (X, Y only) from a video to a CSV file, frame by frame, using MediaPipe.

In [42]:
def extract_and_save_normalized_landmarks(video_path, output_csv_path):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f" Error: Could not open video at {video_path}")
        return

    # Get video frame dimensions
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Storage for all frames
    all_frames_data = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_frame = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

        # Detect hand and pose landmarks
        hand_result = hand_landmarker.detect(mp_frame)
        pose_result = pose_landmarker.detect(mp_frame)

        # Initialize a row with zeros (for missing landmarks)
        frame_data = [0.0] * TOTAL_COLUMNS

        # Store left and right hand landmarks (normalized)
        if hand_result.hand_landmarks:
            for hand_idx, hand in enumerate(hand_result.hand_landmarks):
                # Identify left or right hand
                if hand_result.handedness[hand_idx][0].category_name == "Left":
                    base_idx = 0  # Left hand starts at index 0
                else:
                    base_idx = LEFT_HAND_LANDMARKS * 2  # Right hand starts after left hand

                for landmark_idx, landmark in enumerate(hand):
                    x = landmark.x  # Normalized (0 to 1)
                    y = landmark.y  # Normalized (0 to 1)
                    frame_data[base_idx + landmark_idx * 2] = x
                    frame_data[base_idx + landmark_idx * 2 + 1] = y

        # Store pose landmarks (normalized)
        if pose_result.pose_landmarks:
            base_idx = (LEFT_HAND_LANDMARKS + RIGHT_HAND_LANDMARKS) * 2  # Pose starts after both hands
            for landmark_idx, landmark in enumerate(pose_result.pose_landmarks[0]):
                x = landmark.x  # Normalized (0 to 1)
                y = landmark.y  # Normalized (0 to 1)
                frame_data[base_idx + landmark_idx * 2] = x
                frame_data[base_idx + landmark_idx * 2 + 1] = y

        # Append frame data
        all_frames_data.append(frame_data)

    cap.release()
    # Convert to DataFrame and save
    df = pd.DataFrame(all_frames_data)
    df.to_csv(output_csv_path, index=False, header=False)
    print(f"Saved normalized landmarks (X, Y only) to {output_csv_path}")

A function that processes all `.mp4` videos in a folder (and subfolders), extracts landmarks, and saves them as CSVs—skipping files already processed.

In [43]:
def process_all_videos(input_folder, output_folder):
    # Walk through all subfolders and videos
    for root, dirs, files in os.walk(input_folder):
        # 'root' is the current directory path
        # 'dirs' is a list of subdirectories in 'root' (not used here)
        # 'files' is a list of file names in 'root'
        for file in files:
            # Process only .mp4 video files
            if file.endswith(".mp4"):
                # Full path to the input video file
                input_video_path = os.path.join(root, file)

                # Relative path of the current directory with respect to the input folder
                # This keeps the folder structure consistent in the output
                relative_path = os.path.relpath(root, input_folder)

                # Full path to the output subfolder where the CSV will be saved
                output_subfolder = os.path.join(output_folder, relative_path)

                # Create the output subfolder if it doesn't already exist
                os.makedirs(output_subfolder, exist_ok=True)
                # Output CSV path
                output_csv_path = os.path.join(output_subfolder, file.replace('.mp4', '.csv'))

                # Check if the CSV file already exists
                if os.path.exists(output_csv_path):
                    print(f"Skipping {file}: CSV already exists at {output_csv_path}")
                    continue  # Skip processing

                # Process the video
                print(f"Processing: {input_video_path} → {output_csv_path}")
                extract_and_save_normalized_landmarks(input_video_path, output_csv_path)

Processes all training videos by extracting landmarks and saving them as CSVs in the specified output folder.

In [None]:
input_videos_folder = "/content/drive/MyDrive/ASL_Project_Mika/Dataset_submission/test"
output_csv_folder = "/content/drive/MyDrive/ASL_Project_Mika/Dataset_submission/annotated_test_data"
process_all_videos(input_videos_folder, output_csv_folder)

# augmentaion

This function applies a series of random augmentations—including noise, scaling, translation, rotation, and optional horizontal flipping—to a given sequence of 2D hand and pose landmarks with shape `(frames, 150)` to increase variability and robustness during training.

In [45]:
def augment_sequence(sequence, flip_prob=0.5, noise_level=0.01):
    """Apply augmentations to a single (30, 150) sequence"""
    # Make a copy of the input so the original isn't modified
    seq = np.copy(sequence)

    # 1. Add small Gaussian noise
    # Adds random variation to simulate sensor noise
    seq += np.random.normal(0, noise_level, seq.shape)

    # 2. Random scaling
    # Multiplies all coordinates by a small random factor (around 1.0)
    scale = np.random.uniform(0.95, 1.05)
    seq *= scale

    # 3. Random translation
    # Adds a small constant offset to all coordinates
    shift = np.random.uniform(-0.05, 0.05)
    seq += shift

    # 4. Random rotation (2D)
    # Define a helper function to apply 2D rotation
    def rotate_coords(xy_array, angle_rad):
        # 2x2 rotation matrix
        rot_matrix = np.array([
            [np.cos(angle_rad), -np.sin(angle_rad)],
            [np.sin(angle_rad),  np.cos(angle_rad)]
        ])
        # Apply rotation matrix to the (N, 2) array
        return xy_array @ rot_matrix.T

    for t in range(seq.shape[0]): # Loop over each frame
        frame = seq[t].reshape(-1, 2) # Reshape (150,) → (75, 2) → (x, y) pairs
        angle = np.radians(np.random.uniform(-5, 5)) # Small random angle in radians
        rotated = rotate_coords(frame, angle) # Rotate coordinates
        seq[t] = rotated.flatten() # Flatten back to 1D (150,)

    # 5. Optional horizontal flip
    if random.random() < flip_prob:
        for t in range(seq.shape[0]):
            frame = seq[t].reshape(-1, 2)
            frame[:, 0] = 1.0 - frame[:, 0] # Flip X coordinates horizontally
            seq[t] = frame.flatten()

    return seq

This function augments landmark CSV files with random transformations and saves them to an output folder, skipping files that already exist.

In [46]:
def augment_all_csvs(input_folder, output_folder, num_augmentations=1):
    # Walk through all directories and files in the input folder
    for root, dirs, files in os.walk(input_folder):
        for file in files:
            if not file.endswith('.csv'): # Skip non-CSV files
                continue

            # Get relative path to preserve folder structure
            relative_class_path = os.path.relpath(root, input_folder)

            # Full path to the input CSV file
            input_csv_path = os.path.join(root, file)

            # Load the CSV into a NumPy array (each row = frame, shape should be (30, 150))
            sequence = pd.read_csv(input_csv_path, header=None).values

            # Check for correct shape (150 features per frame)
            if sequence.shape[1] != 150:
                print(f"Skipping {file}: incorrect shape {sequence.shape}")
                continue

            # Set up output path
            output_subfolder = os.path.join(output_folder, relative_class_path)
            os.makedirs(output_subfolder, exist_ok=True)

            # Save augmentations (only if not already saved)
            for i in range(num_augmentations):
                # Generate output filename (e.g., sample_aug1.csv)
                output_file_name = file.replace('.csv', f'_aug{i+1}.csv')
                output_csv_path = os.path.join(output_subfolder, output_file_name)

                # Skip if the augmented version already exists
                if os.path.exists(output_csv_path):
                    print(f"Skipping {output_file_name}: already exists at {output_csv_path}")
                    continue

                # Apply augmentation and save to CSV (no header or index)
                aug_sequence = augment_sequence(sequence)
                pd.DataFrame(aug_sequence).to_csv(output_csv_path, index=False, header=False)
                print(f"Saved: {output_csv_path}")

Define paths

In [47]:
input_annotated_folder = "/content/drive/MyDrive/ASL_Project_Mika/Dataset_submission/annotated_train_data"
output_augmented_folder = "/content/drive/MyDrive/ASL_Project_Mika/Dataset_submission/augmented_train_data"

calls the function to generate 1 augmented version of each CSV in the annotated folder and save it to the augmented folder.

In [None]:
augment_all_csvs(
    input_folder=input_annotated_folder,
    output_folder=output_augmented_folder,
    num_augmentations=1
)