In [None]:
import os
import numpy as np
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from collections import defaultdict
import random

In [2]:
# Parameters
BODY_KEYPOINTS = 33 * 3  # (x, y, z) for each body keypoint
HAND_KEYPOINTS = 42 * 3  # (x, y, z) for each hand keypoint (21 per hand)
MAX_LEN = BODY_KEYPOINTS + HAND_KEYPOINTS

In [3]:
def load_split_data(json_file):
    """Load JSON data from a file."""
    if not os.path.exists(json_file):
        raise FileNotFoundError(f"File not found: {json_file}")
    
    with open(json_file, 'r') as file:
        return json.load(file)

In [4]:
def normalize_keypoints(body, hands):
    """
    Normalize keypoints from structured JSON format.
    
    Args:
        body (list): List of body keypoints [{x, y, z}].
        hands (list): List of hand keypoints [{x, y, z}].
    
    Returns:
        np.array: Flattened keypoints of fixed length.
    """
    keypoints = []

    # Extract body keypoints
    if isinstance(body, list) and len(body) > 0:
        for point in body:
            if isinstance(point, dict) and all(k in point for k in ["x", "y", "z"]):
                keypoints.extend([point["x"], point["y"], point["z"]])
            else:
                keypoints.extend([0, 0, 0])  # Fill missing keypoints
    else:
        keypoints.extend([0] * BODY_KEYPOINTS)  # If body keypoints missing

    # Extract hand keypoints
    if isinstance(hands, list) and len(hands) > 0:
        for point in hands:
            if isinstance(point, dict) and all(k in point for k in ["x", "y", "z"]):
                keypoints.extend([point["x"], point["y"], point["z"]])
            else:
                keypoints.extend([0, 0, 0])
    else:
        keypoints.extend([0] * HAND_KEYPOINTS)  # If hand keypoints missing

    # Ensure fixed length (body + hands)
    if len(keypoints) < MAX_LEN:
        keypoints += [0] * (MAX_LEN - len(keypoints))  # Pad with zeros
    elif len(keypoints) > MAX_LEN:
        keypoints = keypoints[:MAX_LEN]  # Trim extra values

    return np.array(keypoints, dtype=np.float32)

In [5]:
def remove_invalid_data(json_data):
    """
    Remove samples where the hand keypoints exist but are completely missing (all zeros).
    A sample is discarded if the 'hands' list is non-empty and every keypoint (x, y, z) is zero.
    """
    valid_data = []
    for item in json_data:
        kp = item.get("keypoints", {})
        hands = kp.get("hands", [])
        
        # If hands is non-empty, check if all hand keypoints are zero.
        if isinstance(hands, list) and len(hands) > 0:
            hand_all_zero = True
            for point in hands:
                # Convert coordinates to float and check if they are zero.
                x = float(point.get("x", 0))
                y = float(point.get("y", 0))
                z = float(point.get("z", 0))
                if not (abs(x) < 1e-6 and abs(y) < 1e-6 and abs(z) < 1e-6):
                    hand_all_zero = False
                    break
            # If all hand keypoints are zero, discard this sample.
            if hand_all_zero:
                continue
        
        # Otherwise, keep the sample.
        valid_data.append(item)
    
    return valid_data


In [6]:
def balance_data(data):
    """
    Balance data across classes by downsampling the majority classes.
    
    Args:
        data (list): List of data samples (each sample is a dict with a "label" key).
    
    Returns:
        list: Balanced list of data samples.
    """
    label_to_items = defaultdict(list)
    for item in data:
        label = item.get("label")
        if label is not None:
            label_to_items[label].append(item)
    
    # Find the minimum count across classes
    min_count = min(len(items) for items in label_to_items.values())
    
    balanced = []
    for label, items in label_to_items.items():
        # Randomly sample without replacement (downsample) if necessary
        if len(items) > min_count:
            balanced.extend(random.sample(items, min_count))
        else:
            balanced.extend(items)
    
    return balanced

In [7]:
def prepare_data(json_data, label_encoder):
    """Prepare data for training by extracting keypoints and labels."""
    X, y = [], []
    
    for item in json_data:
        # Ensure required keys exist
        if "keypoints" not in item or "label" not in item:
            continue
        
        keypoints = normalize_keypoints(item["keypoints"].get("body", []), item["keypoints"].get("hands", []))
        X.append(keypoints)
        y.append(item["label"])
    
    if not X:
        raise ValueError("No valid data found after filtering.")
    
    y = label_encoder.transform(y)
    return np.array(X, dtype=np.float32), np.array(y)

In [8]:
def split_data(train_json, val_json, test_json):
    """Load and preprocess data."""
    train_data = load_split_data(train_json)
    val_data = load_split_data(val_json)
    test_data = load_split_data(test_json)
    
    # Remove invalid data (samples with no keypoints)
    train_data = remove_invalid_data(train_data)
    val_data = remove_invalid_data(val_data)
    test_data = remove_invalid_data(test_data)
    
    # Balance the training data across classes by downsampling
    train_data = balance_data(train_data)
    
    # Label encoding using training labels
    label_encoder = LabelEncoder()
    labels = [item["label"] for item in train_data if "label" in item]
    label_encoder.fit(labels)
    
    # Prepare datasets
    X_train, y_train = prepare_data(train_data, label_encoder)
    X_val, y_val = prepare_data(val_data, label_encoder)
    X_test, y_test = prepare_data(test_data, label_encoder)
    
    # One-hot encode labels
    num_classes = len(label_encoder.classes_)
    y_train = to_categorical(y_train, num_classes=num_classes)
    y_val   = to_categorical(y_val, num_classes=num_classes)
    y_test  = to_categorical(y_test, num_classes=num_classes)
    
    return (X_train, y_train), (X_val, y_val), (X_test, y_test), label_encoder

In [None]:
if __name__ == "__main__":
    train_json = "../dataset/data/train_data.json"
    val_json   = "../dataset/data/val_data.json"
    test_json  = "../dataset/data/test_data.json"
    
    (X_train, y_train), (X_val, y_val), (X_test, y_test), label_encoder = split_data(train_json, val_json, test_json)
    
    # Save processed data
    np.savez("../dataset.npz", X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, X_test=X_test, y_test=y_test)
    np.save("../label_encoder.npy", label_encoder.classes_)
    
    print("✅ Data preprocessing completed and saved.")
    
    # For diagnostics, print dataset lengths and keypoints info from training data:
    train_data = load_split_data(train_json)
    val_data   = load_split_data(val_json)
    test_data  = load_split_data(test_json)
    
    print(f"Train Data: {len(train_data)} samples")
    print(f"Validation Data: {len(val_data)} samples")
    print(f"Test Data: {len(test_data)} samples")
    
    for i, item in enumerate(train_data[:5]):  # Print first 5 samples
        body = item["keypoints"].get("body", [])
        hands = item["keypoints"].get("hands", [])
        print(f"Sample {i+1}:")
        print(f"  - Body keypoints: {len(body)}")
        print(f"  - Hand keypoints: {len(hands)}")
        print(f"  - Total keypoints: {len(body) + len(hands)}")
        print("-" * 30)
    
    missing_keypoints_count = sum(1 for item in train_data if not item["keypoints"].get("body") and not item["keypoints"].get("hands"))
    print(f"Number of samples with missing keypoints: {missing_keypoints_count}")