In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import mediapipe as mp
import cv2
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
import json
import pickle

# Mediapipe setup
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils

# Config
BATCH_SIZE = 32
EPOCHS = 30
NUM_CLASSES = 36 # 26 letters + 10 digits
MODEL_PATH = "bsl_model"
TFLITE_MODEL_PATH = "bsl_model.tflite"
SEQUENCE_LENGTH = 30 # Number of frames to consider for each sample

# Extract hand landmarks from video
def extract_landmarks_from_video(video_path, max_frames=SEQUENCE_LENGTH):
    cap = cv2.VideoCapture(video_path)
    landmarks_sequence = []

    with mp_hands.Hands(
        static_image_mode=False,
        max_num_hands=2,  # BSL often uses two hands
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
    ) as hands:
        frame_count = 0
        while cap.isOpened() and frame_count < max_frames:
            success, image = cap.read()
            if not success:
                break

            # Convert BGR to RGB
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Process image with MediaPipe
            results = hands.process(image_rgb)

            # Extract landmarks if hands are detected
            frame_landmarks = []
            if results.multi_hand_landmarks:
                for hand_landmarks in results.multi_hand_landmarks:
                    # Get handedness (left or right)
                    handedness = (
                        "Left"
                        if results.multi_handedness[0].classification[0].label == "Left"
                        else "Right"
                    )

                    # Extract landmarks
                    landmarks = []
                    for point in hand_landmarks.landmark:
                        landmarks.extend([point.x, point.y, point.z])

                    # Add handedness as a feature (0 for left, 1 for right)
                    handedness_feature = 1.0 if handedness == "Right" else 0.0
                    landmarks.append(handedness_feature)

                    frame_landmarks.append(landmarks)

            # If no hands detected, add zeros
            if not frame_landmarks:
                # 21 landmarks × 3 coordinates + 1 handedness feature = 64 values
                frame_landmarks = [[0.0] * 64]

            # Add to sequence
            landmarks_sequence.append(frame_landmarks)
            frame_count += 1

    cap.release()

    # Pad sequence if needed
    while len(landmarks_sequence) < max_frames:
        landmarks_sequence.append([[0.0] * 64])

    # Truncate if too long
    landmarks_sequence = landmarks_sequence[:max_frames]

    return landmarks_sequence

In [None]:
# Process dataset videos to extract landmarks
def process_dataset(dataset_dir):
    dataset = {
        "landmarks": [],
        "labels": [],
        "handedness": [],  # 'left', 'right', or 'both'
        "num_hands": [],  # 1 or 2
    }

    # Walk through dataset directory
    for root, dirs, files in os.walk(dataset_dir):
        for file in files:
            if file.endswith((".mp4", ".avi", ".mov")):
                # Extract class from path
                path_parts = root.split(os.sep)
                # Assuming structure like .../training/letters/A/right_handed/one_handed/
                sign_class = None
                handedness = None
                num_hands = None

                # Find relevant parts in path
                for i, part in enumerate(path_parts):
                    if part in ("letters", "numbers"):
                        if i + 1 < len(path_parts):
                            sign_class = path_parts[i + 1]
                    if part in ("right_handed", "left_handed"):
                        handedness = "right" if part == "right_handed" else "left"
                    if part in ("one_handed", "two_handed"):
                        num_hands = 1 if part == "one_handed" else 2

                if sign_class and handedness and num_hands:
                    video_path = os.path.join(root, file)
                    landmarks_sequence = extract_landmarks_from_video(video_path)

                    dataset["landmarks"].append(landmarks_sequence)
                    dataset["labels"].append(sign_class)
                    dataset["handedness"].append(handedness)
                    dataset["num_hands"].append(num_hands)

    return dataset


In [None]:
# Prepare data for training
def prepare_data(dataset):
    # Convert labels to numerical
    unique_labels = sorted(set(dataset["labels"]))
    label_to_idx = {label: i for i, label in enumerate(unique_labels)}

    # Save label mapping
    with open("label_mapping.json", "w") as f:
        json.dump(label_to_idx, f)

    # Convert to numpy arrays
    X = np.array(dataset["landmarks"])
    y = np.array([label_to_idx[label] for label in dataset["labels"]])

    # Reshape X to (num_samples, sequence_length, num_features)
    # Each frame can have up to 2 hands with 64 features each, so max 128 features per frame
    X_reshaped = np.zeros((len(X), SEQUENCE_LENGTH, 128))

    for i, seq in enumerate(X):
        for j, frame in enumerate(seq):
            if len(frame) == 1:  # One hand detected
                X_reshaped[i, j, :64] = frame[0]
            elif len(frame) == 2:  # Two hands detected
                X_reshaped[i, j, :64] = frame[0]
                X_reshaped[i, j, 64:] = frame[1]

    # One-hot encode labels
    y_one_hot = tf.keras.utils.to_categorical(y, num_classes=len(unique_labels))

    # Split data
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_reshaped, y_one_hot, test_size=0.3, random_state=42, stratify=y_one_hot
    )

    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )

    return X_train, X_val, X_test, y_train, y_val, y_test, label_to_idx


In [None]:
# Build model for sequence data
def build_model(input_shape, num_classes):
    model = models.Sequential(
        [
            # LSTM layers for sequence processing
            layers.Bidirectional(
                layers.LSTM(64, return_sequences=True), input_shape=input_shape
            ),
            layers.Dropout(0.3),
            layers.Bidirectional(layers.LSTM(32)),
            layers.Dropout(0.3),
            # Dense layers for classification
            layers.Dense(64, activation="relu"),
            layers.Dropout(0.3),
            layers.Dense(num_classes, activation="softmax"),
        ]
    )

    model.compile(
        optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
    )

    return model


In [None]:
# Train the model
def train_model(model, X_train, y_train, X_val, y_val, epochs=EPOCHS):
    # Create model directory
    os.makedirs(MODEL_PATH, exist_ok=True)

    # Add callbacks
    callbacks = [
        tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        tf.keras.callbacks.ModelCheckpoint(
            filepath=os.path.join(
                MODEL_PATH, "model-{epoch:02d}-{val_accuracy:.4f}.h5"
            ),
            monitor="val_accuracy",
            save_best_only=True,
            mode="max",
        ),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=5, min_lr=1e-6),
    ]

    # Train model
    history = model.fit(
        X_train,
        y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=BATCH_SIZE,
        callbacks=callbacks,
    )

    return history

In [None]:
# Convert to TensorFlow Lite
def convert_to_tflite(model, tflite_path):
    # Convert to TensorFlow Lite model
    converter = tf.lite.TFLiteConverter.from_keras_model(model)

    # Enable optimizations
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    # Convert the model
    tflite_model = converter.convert()

    # Save the TFLite model
    with open(tflite_path, "wb") as f:
        f.write(tflite_model)

    print(f"TensorFlow Lite model saved to {tflite_path}")

    # Get model size info
    tflite_size = os.path.getsize(tflite_path) / (1024 * 1024)
    print(f"TensorFlow Lite Model size: {tflite_size:.2f} MB")

    return tflite_path

In [None]:
# Process a single frame for inference
def process_frame_for_inference(frame, hands_model):
    # Convert BGR to RGB
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process image with MediaPipe
    results = hands_model.process(image_rgb)

    # Extract landmarks if hands are detected
    frame_landmarks = []
    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            # Get handedness
            handedness = (
                "Left"
                if results.multi_handedness[0].classification[0].label == "Left"
                else "Right"
            )

            # Extract landmarks
            landmarks = []
            for point in hand_landmarks.landmark:
                landmarks.extend([point.x, point.y, point.z])

            # Add handedness as a feature (0 for left, 1 for right)
            handedness_feature = 1.0 if handedness == "Right" else 0.0
            landmarks.append(handedness_feature)

            frame_landmarks.append(landmarks)

    # If no hands detected, add zeros
    if not frame_landmarks:
        frame_landmarks = [[0.0] * 64]

    # Reshape for model input (assuming model expects sequence of frames)
    features = np.zeros((1, 128))

    if len(frame_landmarks) == 1:  # One hand detected
        features[0, :64] = frame_landmarks[0]
    elif len(frame_landmarks) == 2:  # Two hands detected
        features[0, :64] = frame_landmarks[0]
        features[0, 64:] = frame_landmarks[1]

    return features, results.multi_hand_landmarks

In [None]:
# Save preprocessing information
def save_preprocessing_info(label_to_idx):
    preprocessing_info = {
        "label_to_idx": label_to_idx,
        "sequence_length": SEQUENCE_LENGTH,
    }

    with open("preprocessing_info.pkl", "wb") as f:
        pickle.dump(preprocessing_info, f)

In [None]:
# Main function
def main(dataset_dir):
    print("Processing dataset...")
    dataset = process_dataset(dataset_dir)

    print(f"Dataset processed. Total samples: {len(dataset['labels'])}")

    print("Preparing data for training...")
    X_train, X_val, X_test, y_train, y_val, y_test, label_to_idx = prepare_data(dataset)

    print(f"Data prepared. Training set size: {X_train.shape}")

    print("Building model...")
    model = build_model(
        input_shape=(SEQUENCE_LENGTH, 128), num_classes=len(label_to_idx)
    )
    model.summary()

    print("Training model...")
    history = train_model(model, X_train, y_train, X_val, y_val)

    print("Evaluating model...")
    test_loss, test_acc = model.evaluate(X_test, y_test)
    print(f"Test accuracy: {test_acc:.4f}")

    print("Saving model...")
    model.save(os.path.join(MODEL_PATH, "bsl_final_model.h5"))

    print("Converting to TensorFlow Lite...")
    tflite_path = convert_to_tflite(model, TFLITE_MODEL_PATH)

    print("Saving preprocessing information...")
    save_preprocessing_info(label_to_idx)

    return tflite_path, label_to_idx


if __name__ == "__main__":
    dataset_dir = "bsl_dataset"  # Change to your dataset directory
    tflite_path, label_to_idx = main(dataset_dir)
    print(
        f"Model training and conversion complete. TFLite model saved to: {tflite_path}"
    )
    print(f"Label mapping: {label_to_idx}")
