## Importing Necessary Libraries

In [11]:
import cv2
import mediapipe as mp
import json
import os
import numpy as np
from moviepy import VideoFileClip


## Initializing Mediapipe and defining function to extract keypoints

In [None]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# Initialize Pose
pose = mp_pose.Pose(static_image_mode=False)

# Access PoseLandmark from mp_pose
PoseLandmark = mp_pose.PoseLandmark

def extract_keypoints_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    keypoints_per_frame = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = pose.process(frame_rgb)

        if result.pose_landmarks:
            landmarks = result.pose_landmarks.landmark
            frame_keypoints = [(lm.x, lm.y, lm.z) for lm in landmarks]
            keypoints_per_frame.append(frame_keypoints)
        else:
            keypoints_per_frame.append(None)

    cap.release()
    return keypoints_per_frame


## Function Definitions

### Motion Detection Functions
1. <b>is_overall_posture(kps_window)</b>

Purpose: Detects if the swimmer maintains a stable overall posture.

Logic: Calculates the angle between shoulder and hip for each frame. If the standard deviation of these angles is low, posture is considered stable.

Threshold: Standard deviation < 0.05 radians.

2. <b>is_kick_motion(kps_window)</b>

Purpose: Detects the presence of a kicking motion.

Logic: Computes the average y-coordinate of both ankles over the window. If the variance is above 0.002, significant vertical movement (kick) is present.

Threshold: Variance > 0.002.

3. <b>is_breathing_motion(kps_window)</b>

Purpose: Detects breathing events (head turns).

Logic: Measures the absolute x-difference between left and right shoulders. If the range exceeds 0.05, a breathing motion is detected.

Threshold: Range > 0.05.

4. <b>is_hand_entry(kps_window)</b>

Purpose: Detects hand entry into the water. 

Logic: Checks if either wrist is below the nose in y-coordinate for each frame. If this occurs in more than 40% of the window, hand entry is detected.

Threshold: More than 40% of frames in the window.

In [None]:
def is_overall_posture(kps_window):
    """
    Detects if the current window of keypoints represents a stable overall posture.

    Approach:
    - Computes the angle between the shoulder and hip for each frame.
    - If the standard deviation of these angles is low (< 0.05), posture is considered stable.

    Args:
        kps_window (list): List of keypoints for a sliding window of frames.

    Returns:
        bool: True if posture is stable, False otherwise.
    """
    shoulder = mp_pose.PoseLandmark.LEFT_SHOULDER.value
    hip = mp_pose.PoseLandmark.LEFT_HIP.value

    angles = []
    for frame in kps_window:
        if frame:
            # Vector from hip to shoulder
            vec = np.array(frame[shoulder][:2]) - np.array(frame[hip][:2])
            angle = np.arctan2(vec[1], vec[0])
            angles.append(angle)
    # Low variance in angle = stable posture
    return np.std(angles) < 0.05

def is_kick_motion(kps_window):
    """
    Detects if the window contains significant kicking movement.

    Approach:
    - Calculates the average y-coordinate (vertical) of both ankles per frame.
    - If the variance of these y-values is above a threshold, a kick is detected.

    Args:
        kps_window (list): List of keypoints for a sliding window of frames.

    Returns:
        bool: True if kick motion is present, False otherwise.
    """
    ankle_ids = [mp_pose.PoseLandmark.LEFT_ANKLE.value,
                 mp_pose.PoseLandmark.RIGHT_ANKLE.value]
    y_coords = []

    for frame in kps_window:
        if frame:
            avg_y = np.mean([frame[i][1] for i in ankle_ids])
            y_coords.append(avg_y)

    if len(y_coords) < 5:
        return False

    variance = np.var(y_coords)
    return variance > 0.002  # Tuned threshold for kick detection

def is_breathing_motion(kps_window):
    """
    Detects breathing events by measuring shoulder symmetry.

    Approach:
    - Computes the absolute difference in x-coordinates between left and right shoulders for each frame.
    - If the range of these differences exceeds a threshold, a breathing motion (head turn) is detected.

    Args:
        kps_window (list): List of keypoints for a sliding window of frames.

    Returns:
        bool: True if breathing motion is present, False otherwise.
    """
    left_shoulder = mp_pose.PoseLandmark.LEFT_SHOULDER.value
    right_shoulder = mp_pose.PoseLandmark.RIGHT_SHOULDER.value

    x_diffs = []
    for frame in kps_window:
        if frame:
            x_diff = abs(frame[left_shoulder][0] - frame[right_shoulder][0])
            x_diffs.append(x_diff)

    if len(x_diffs) < 5:
        return False

    return np.max(x_diffs) - np.min(x_diffs) > 0.05

def is_hand_entry(kps_window):
    """
    Detects hand entry events (when the hand dips below the head/nose).

    Approach:
    - Checks if either wrist is below the nose (in y-coordinate) for each frame.
    - If this occurs in more than 40% of the window, hand entry is detected.

    Args:
        kps_window (list): List of keypoints for a sliding window of frames.

    Returns:
        bool: True if hand entry motion is present, False otherwise.
    """
    left_wrist = mp_pose.PoseLandmark.LEFT_WRIST.value
    right_wrist = mp_pose.PoseLandmark.RIGHT_WRIST.value
    nose = mp_pose.PoseLandmark.NOSE.value

    dip_count = 0
    for frame in kps_window:
        if frame:
            if (frame[left_wrist][1] > frame[nose][1] or
                frame[right_wrist][1] > frame[nose][1]):
                dip_count += 1

    return dip_count > len(kps_window) * 0.4


### Feature Extraction Functions

1. <b>vector_angle(v1, v2): </b>

Calculates the angle (in radians) between two vectors. Used to compute joint angles from keypoints.

2. <b>compute_velocity(kps_window, joint_id): </b>

Computes the average velocity of a joint over a window, based on frame-to-frame displacement.

3. <b>compute_symmetry(kps_window, joint_id_left, joint_id_right): </b>

Measures symmetry as the mean absolute x-difference between left and right joints (e.g., shoulders).

4. <b>compute_angle_variance(kps_window, joint_a, joint_b, joint_c): </b>

Measures the variance of the angle at joint B (between BA and BC) over the window, indicating stability or smoothness.

5. <b>compute_features(kps_window): </b>

Extracts all relevant features for classification, including kick consistency, hand entry quality, breathing balance, arm control, and body rotation stability.

In [None]:
def vector_angle(v1, v2):
    """
    Calculate the angle (in radians) between two vectors v1 and v2.

    Args:
        v1 (array-like): First vector.
        v2 (array-like): Second vector.

    Returns:
        float: Angle in radians between v1 and v2.
    """
    v1 = np.array(v1)
    v2 = np.array(v2)
    # Normalize vectors to unit length to avoid scaling issues
    unit_v1 = v1 / (np.linalg.norm(v1) + 1e-8)
    unit_v2 = v2 / (np.linalg.norm(v2) + 1e-8)
    # Compute dot product and clamp for numerical stability
    dot_product = np.clip(np.dot(unit_v1, unit_v2), -1.0, 1.0)
    return np.arccos(dot_product)

def compute_velocity(kps_window, joint_id):
    """
    Compute the average velocity magnitude of a joint over a window of frames.

    Velocity is calculated as the mean of frame-to-frame Euclidean distances,
    multiplied by the assumed frame rate (30 FPS).

    Args:
        kps_window (list): List of keypoints for the window.
        joint_id (int): Index of the joint to track.

    Returns:
        float: Estimated velocity in pixels per second.
    """
    positions = [np.array(frame[joint_id][:2]) for frame in kps_window if frame]
    if len(positions) < 2:
        return 0.0
    # Compute displacement between consecutive frames
    displacements = [np.linalg.norm(positions[i+1] - positions[i]) for i in range(len(positions)-1)]
    mean_disp = np.mean(displacements)
    velocity = mean_disp * 30  # 30 FPS assumed
    return velocity

def compute_symmetry(kps_window, joint_id_left, joint_id_right):
    """
    Calculate symmetry as the average absolute difference of x-coordinates
    between left and right joints over the window.

    Lower values indicate more symmetry.

    Args:
        kps_window (list): List of keypoints for the window.
        joint_id_left (int): Index for the left joint.
        joint_id_right (int): Index for the right joint.

    Returns:
        float: Mean absolute x-difference (symmetry score).
    """
    diffs = []
    for frame in kps_window:
        if frame:
            diff = abs(frame[joint_id_left][0] - frame[joint_id_right][0])
            diffs.append(diff)
    if not diffs:
        return 0.0
    return np.mean(diffs)

def compute_angle_variance(kps_window, joint_a, joint_b, joint_c):
    """
    Calculate the variance of the angle formed at joint B by segments BA and BC
    over the window.

    Useful for assessing joint stability or movement smoothness.

    Args:
        kps_window (list): List of keypoints for the window.
        joint_a (int): Index of joint A.
        joint_b (int): Index of joint B (vertex).
        joint_c (int): Index of joint C.

    Returns:
        float: Variance of the angle (in radians^2) across the window.
    """
    angles = []
    for frame in kps_window:
        if frame:
            a = np.array(frame[joint_a][:2])
            b = np.array(frame[joint_b][:2])
            c = np.array(frame[joint_c][:2])
            v1 = a - b
            v2 = c - b
            angle = vector_angle(v1, v2)
            angles.append(angle)
    if not angles:
        return 0.0
    return np.var(angles)

def compute_features(kps_window):
    """
    Extracts biomechanically relevant features from a window of keypoints.

    Features:
    - Variance of ankle y-coordinates: kick consistency
    - Velocity smoothness of wrists: hand entry quality
    - Shoulder x-symmetry: breathing balance
    - Elbow angle variance: arm control
    - Hip angle variance: body rotation stability

    Args:
        kps_window (list): List of keypoints for the window.

    Returns:
        dict: Dictionary of computed features.
    """
    mp = mp_pose.PoseLandmark

    # 1. Kick consistency: variance of ankle y-coordinates
    ankle_ids = [mp.LEFT_ANKLE.value, mp.RIGHT_ANKLE.value]
    y_coords = [np.mean([frame[i][1] for i in ankle_ids]) for frame in kps_window if frame]
    ankle_y_variance = np.var(y_coords) if len(y_coords) > 1 else 0.0

    # 2. Hand entry quality: velocity smoothness of wrists
    left_wrist_velocity = compute_velocity(kps_window, mp.LEFT_WRIST.value)
    right_wrist_velocity = compute_velocity(kps_window, mp.RIGHT_WRIST.value)
    velocity_smoothness = np.abs(left_wrist_velocity - right_wrist_velocity)

    # 3. Breathing balance: left-right shoulder x-symmetry
    shoulder_symmetry = compute_symmetry(kps_window, mp.LEFT_SHOULDER.value, mp.RIGHT_SHOULDER.value)

    # 4. Arm control: elbow angle variance (shoulder-elbow-wrist)
    left_elbow_angle_var = compute_angle_variance(
        kps_window, mp.LEFT_SHOULDER.value, mp.LEFT_ELBOW.value, mp.LEFT_WRIST.value)
    right_elbow_angle_var = compute_angle_variance(
        kps_window, mp.RIGHT_SHOULDER.value, mp.RIGHT_ELBOW.value, mp.RIGHT_WRIST.value)
    elbow_angle_variance = (left_elbow_angle_var + right_elbow_angle_var) / 2 if (left_elbow_angle_var and right_elbow_angle_var) else 0.0

    # 5. Body rotation stability: hip angle variance (shoulder-hip-knee)
    left_hip_angle_var = compute_angle_variance(
        kps_window, mp.LEFT_SHOULDER.value, mp.LEFT_HIP.value, mp.LEFT_KNEE.value)
    right_hip_angle_var = compute_angle_variance(
        kps_window, mp.RIGHT_SHOULDER.value, mp.RIGHT_HIP.value, mp.RIGHT_KNEE.value)
    hip_angle_variance = (left_hip_angle_var + right_hip_angle_var) / 2 if (left_hip_angle_var and right_hip_angle_var) else 0.0

    features = {
        "ankle_y_variance": ankle_y_variance,
        "velocity_smoothness": velocity_smoothness,
        "shoulder_symmetry": shoulder_symmetry,
        "elbow_angle_variance": elbow_angle_variance,
        "hip_angle_variance": hip_angle_variance,
    }
    return features


### Function to classify forms

**Purpose:**  
Classifies the quality of a detected swimming motion window (kick, breathing, hand entry, or overall posture) based on extracted biomechanical features.

**How it works:**  
- For each motion type, the function checks the relevant feature(s) against a threshold.
- Returns a label describing form quality.

**Thresholds and Logic:**

| Motion           | Feature(s) Used                | Threshold(s)            | Label (if above threshold)         | Label (if below threshold)      |
|------------------|-------------------------------|-------------------------|------------------------------------|---------------------------------|
| kick             | `ankle_y_variance`            | > 0.005                 | `inconsistent_kick`                | `consistent_kick`               |
| breathing        | `shoulder_symmetry`           | > 0.05                  | `breathing_bias_detected`          | `balanced_breathing`            |
| hand_entry       | `velocity_smoothness`         | > 5.0                   | `uneven_hand_entry`                |                                 |
|                  | `shoulder_symmetry`           | > 0.1                   | `asymmetric_hand_entry`            |                                 |
|                  | `elbow_angle_variance`        | > 0.4                   | `unstable_hand_entry`              |                                 |
|                  | (all below thresholds)        |                         | `smooth_hand_entry`                |                                 |
| overall_posture  | `elbow_angle_variance` OR     | > 0.1                   | `unstable_posture`                 | `stable_posture`                |
|                  | `hip_angle_variance`          |                         |                                    |                                 |



**References:**  

Biomechanics of Swimming: Symmetry and Coordination (2016)

- https://www.slideshare.net/slideshow/biomechanics-of-swimming/63080392#21


In [None]:
def classify_form(motion, features):
    if motion == "kick":
        if features["ankle_y_variance"] > 0.005:
            return "inconsistent_kick"
        else:
            return "consistent_kick"
    
    elif motion == "breathing":
        if features["shoulder_symmetry"] > 0.05:
            return "breathing_bias_detected"
        else:
            return "balanced_breathing"
    
    elif motion == "hand_entry":
        if features.get("velocity_smoothness", 0) > 5.0:
            return "uneven_hand_entry"
        elif features.get("shoulder_symmetry", 0) > 0.1:
            return "asymmetric_hand_entry"
        elif features.get("elbow_angle_variance", 0) > 0.4:
            return "unstable_hand_entry"
        else:
            return "smooth_hand_entry"

    
    elif motion == "overall_posture":
        # Use elbow and hip angle variance as proxy for posture stability
        if features["elbow_angle_variance"] > 0.1 or features["hip_angle_variance"] > 0.1:
            return "unstable_posture"
        else:
            return "stable_posture"
    
    else:
        return "unknown"


### Function to separate videos

1. Extract Keypoints:
Uses MediaPipe Pose to extract 3D keypoints for every frame in the input video.

2. Sliding Window Segmentation:
Slides a window (default: 30 frames, stride 10) over the keypoints to analyze short motion segments.

3. Motion Detection:
For each window, applies a series of heuristic detectors:

- is_kick_motion

- is_breathing_motion

- is_hand_entry

- is_overall_posture

The first positive detector determines the window's motion type.

4. Feature Computation:
Extracts features such as ankle y-variance, wrist velocity smoothness, shoulder symmetry, elbow and hip angle variance.

5. Form Classification:
Uses classify_form to assign a quality label (e.g., consistent_kick, unstable_posture) based on feature thresholds.

6. JSON Output:
Saves each window as a JSON file containing:

- Video ID

- Motion type

- Frame range

- Frame rate

- Features

- Label

- Raw keypoints

7. Files are organized under output_base_dir/< motion >/.

In [8]:
def segment_and_classify(video_path, output_base_dir):
    keypoints = extract_keypoints_from_video(video_path)
    window_size, stride = 30, 10
    fps = 30  # or get from video metadata if needed

    for start in range(0, len(keypoints) - window_size, stride):
        window_kps = keypoints[start:start + window_size]

        # Detect motion type
        if is_kick_motion(window_kps):
            motion = "kick"
        elif is_breathing_motion(window_kps):
            motion = "breathing"
        elif is_hand_entry(window_kps):
            motion = "hand_entry"
        elif is_overall_posture(window_kps):
            motion = "overall_posture"
        else:
            continue

        # Compute features
        features = compute_features(window_kps)

        # Classify form
        label = classify_form(motion, features)

        # Prepare JSON data
        data = {
            "video_id": os.path.basename(video_path),
            "motion": motion,
            "start_frame": start,
            "end_frame": start + window_size,
            "fps": fps,
            "features": features,
            "label": label,
            "keypoints": window_kps,  # Save raw keypoints for that window
        }

        # Save JSON file
        out_dir = os.path.join(output_base_dir, motion)
        os.makedirs(out_dir, exist_ok=True)
        output_json_path = os.path.join(out_dir, f"{os.path.basename(video_path)}_{start}.json")

        with open(output_json_path, 'w') as f:
            json.dump(data, f, indent=2)


### Function to retrieve video paths

In [None]:
VIDEO_FOLDER="standardized_videos"
OUTPUT_FOLDER="segmented_videos"
def gather_video_paths():
    video_paths = []
    for style_folder in os.listdir(VIDEO_FOLDER):
        input_style_path = os.path.join(VIDEO_FOLDER, style_folder)

        if not os.path.isdir(input_style_path):
            continue

        for file in os.listdir(input_style_path):
            if file.endswith(".mp4"):
                input_file = os.path.join(input_style_path, file)
                video_paths.append(input_file)
    return video_paths

## Calling files and processing

In [24]:
file_paths = gather_video_paths()

for file in file_paths:
    print(f"[→] Processing {file}...")
    try:
        stroke_style = file.split(os.sep)[1]  # standardized_videos/freestyle/xyz.mp4 → "freestyle"
        style_output_dir = os.path.join(OUTPUT_FOLDER, stroke_style)
        segment_and_classify(file, style_output_dir)
    except Exception as e:
        print(f"[x] Error in {file}: {e}")

[→] Processing standardized_videos\backstroke\Backstroke - slowmotion.mp4...
[→] Processing standardized_videos\backstroke\Backstroke start in super slow motion. Backcrawl olympic start.mp4...
[→] Processing standardized_videos\backstroke\Backstroke swimming technique ｜ Rotation ｜ Swim faster.mp4...
[→] Processing standardized_videos\backstroke\Backstroke Swimming Technique ｜ Stroke.mp4...
[→] Processing standardized_videos\backstroke\Backstroke swimming ： Easy to learn, hard to master..mp4...
[→] Processing standardized_videos\backstroke\How To Swim Backstroke ｜ Technique For Back Crawl Swimming.mp4...
[→] Processing standardized_videos\backstroke\Learn How to Swim Backstroke in 30 Seconds!.mp4...
[→] Processing standardized_videos\breaststroke\Adam Peaty Top Tips： Breaststroke catch.mp4...
[→] Processing standardized_videos\breaststroke\Breaststroke Swimming Technique ｜ Stroke.mp4...
[→] Processing standardized_videos\breaststroke\Breaststroke swimming： SECRET TIPS.mp4...
[→] Process