In [None]:
# Cell 1: Upload and extract MP_Data.zip directly
import os, zipfile
from google.colab import files

print('Select your MP_Data.zip file...')
uploaded = files.upload()

zip_name = list(uploaded.keys())[0]
assert zip_name.endswith('.zip'), f'Expected a .zip file, got: {zip_name}'

print(f'\nUploaded: {zip_name} ({len(uploaded[zip_name]) / (1024*1024):.1f} MB)')

with zipfile.ZipFile(zip_name, 'r') as z:
    z.extractall('/content/')

DATA_PATH = '/content/MP_Data'
MODELS_DIR = '/content/models'
os.makedirs(MODELS_DIR, exist_ok=True)

print(f'\nDATA_PATH: {DATA_PATH}')
print(f'Data exists: {os.path.isdir(DATA_PATH)}')
print(f'Signs found: {sorted(os.listdir(DATA_PATH))}')

In [None]:
# Cell 2: Imports
import numpy as np
import os
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# Cell 3: Configuration
# DATA_PATH set in Cell 1
ACTIONS = np.array([
    'Hello', 'Thank_You', 'Help', 'Yes', 'No',
    'Please', 'Sorry', 'I_Love_You', 'Stop', 'More'
])
NUM_SEQUENCES = 30
SEQUENCE_LENGTH = 30
label_map = {label: num for num, label in enumerate(ACTIONS)}

print(f'Actions ({len(ACTIONS)}): {list(ACTIONS)}')
print(f'Sequences per action: {NUM_SEQUENCES}')
print(f'Frames per sequence: {SEQUENCE_LENGTH}')
print(f'Expected total sequences: {len(ACTIONS) * NUM_SEQUENCES}')

In [None]:
# Cell 4: Load and Prepare Data (auto-discovers all sequences)
sequences = []
labels = []
skipped = 0

for action in ACTIONS:
    action_dir = os.path.join(DATA_PATH, action)
    if not os.path.isdir(action_dir):
        print(f'WARNING: {action_dir} not found!')
        continue

    # Auto-discover all sequence directories
    seq_dirs = sorted([d for d in os.listdir(action_dir)
                       if os.path.isdir(os.path.join(action_dir, d)) and d.isdigit()],
                      key=int)

    for seq_name in seq_dirs:
        window = []
        valid = True
        for frame_idx in range(SEQUENCE_LENGTH):
            frame_path = os.path.join(action_dir, seq_name, f'{frame_idx}.npy')
            if not os.path.isfile(frame_path):
                valid = False
                break
            try:
                frame = np.load(frame_path)
                if frame.shape != (1662,):
                    print(f'Warning: unexpected shape {frame.shape} in {frame_path}')
                    valid = False
                    break
                window.append(frame)
            except Exception as e:
                print(f'Error loading {frame_path}: {e}')
                valid = False
                break

        if valid and len(window) == SEQUENCE_LENGTH:
            sequences.append(window)
            labels.append(label_map[action])
        else:
            skipped += 1

X = np.array(sequences)
y = to_categorical(np.array(labels), num_classes=len(ACTIONS))

print(f'X shape: {X.shape}')
print(f'y shape: {y.shape}')
print(f'Skipped sequences: {skipped}')

# Per-class counts
y_int = np.argmax(y, axis=1)
for i, action in enumerate(ACTIONS):
    print(f'  {action}: {np.sum(y_int == i)} sequences')

In [None]:
# Cell 5: Train/Test Split (90/10 with stratification)
y_integers = np.argmax(y, axis=1)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.1, random_state=42, stratify=y_integers
)

print(f'Training samples: {X_train.shape[0]}')
print(f'Test samples:     {X_test.shape[0]}')
print(f'Train shape: {X_train.shape}')
print(f'Test shape:  {X_test.shape}')

In [None]:
# Cell 5b: Data Augmentation (boosts accuracy significantly with limited webcam data)
#
# With only 30 sequences per sign, the model doesn't see enough variation.
# Augmentation creates synthetic training data with:
#   - Gaussian noise (simulates tracking jitter)
#   - Temporal shifts (simulates timing variation)
#   - Speed variation (simulates faster/slower signing)
#   - Frame dropout (builds robustness)
#   - L/R hand mirroring (doubles effective data)
#
# Set AUGMENT_MULTIPLIER=0 to disable and train on raw data only.

AUGMENT_MULTIPLIER = 5   # Number of augmented copies per sample
USE_MIRROR = True        # Swap L/R hands for additional variation

# --- Augmentation functions (self-contained for Colab) ---

# Keypoint index ranges
LH_START, LH_END = 1536, 1599
RH_START, RH_END = 1599, 1662

def add_gaussian_noise(seq, std=0.005, rng=None):
    rng = rng or np.random.default_rng()
    aug = seq.copy()
    noise = rng.normal(0, std, size=aug.shape).astype(aug.dtype)
    mask = aug != 0
    aug[mask] += noise[mask]
    return aug

def temporal_shift(seq, max_shift=3, rng=None):
    rng = rng or np.random.default_rng()
    shift = rng.integers(-max_shift, max_shift + 1)
    if shift == 0: return seq.copy()
    aug = np.zeros_like(seq)
    sl = len(seq)
    if shift > 0:
        aug[:shift] = seq[0]; aug[shift:] = seq[:sl - shift]
    else:
        a = abs(shift); aug[:sl - a] = seq[a:]; aug[sl - a:] = seq[-1]
    return aug

def mirror_hands(seq):
    aug = seq.copy()
    lh, rh = aug[:, LH_START:LH_END].copy(), aug[:, RH_START:RH_END].copy()
    aug[:, LH_START:LH_END] = rh; aug[:, RH_START:RH_END] = lh
    return aug

def speed_variation(seq, factor_range=(0.85, 1.15), rng=None):
    rng = rng or np.random.default_rng()
    sl = len(seq); factor = rng.uniform(*factor_range)
    new_len = max(int(sl * factor), 2)
    orig_idx = np.linspace(0, sl - 1, new_len)
    tgt_idx = np.linspace(0, new_len - 1, sl)
    mapped = np.interp(tgt_idx, np.arange(new_len), orig_idx)
    aug = np.zeros_like(seq)
    for i in range(seq.shape[1]):
        aug[:, i] = np.interp(mapped, np.arange(sl), seq[:, i])
    return aug

def frame_dropout(seq, drop_rate=0.1, rng=None):
    rng = rng or np.random.default_rng()
    aug = seq.copy()
    for i in range(1, len(aug) - 1):
        if rng.random() < drop_rate: aug[i] = aug[i - 1]
    return aug

def augment_sequence(seq, rng=None):
    rng = rng or np.random.default_rng()
    aug = seq.copy()
    if rng.random() < 0.8: aug = add_gaussian_noise(aug, std=rng.uniform(0.002, 0.008), rng=rng)
    if rng.random() < 0.5: aug = temporal_shift(aug, max_shift=3, rng=rng)
    if rng.random() < 0.4: aug = speed_variation(aug, rng=rng)
    if rng.random() < 0.3: aug = frame_dropout(aug, drop_rate=0.1, rng=rng)
    return aug

# --- Apply augmentation ---
if AUGMENT_MULTIPLIER > 0:
    rng = np.random.default_rng(42)
    original_count = len(X_train)
    all_X, all_y = [X_train], [y_train]

    for i in range(AUGMENT_MULTIPLIER):
        batch = np.array([augment_sequence(s, rng=rng) for s in X_train])
        all_X.append(batch); all_y.append(y_train)

    if USE_MIRROR:
        combined_X = np.concatenate(all_X, axis=0)
        combined_y = np.concatenate(all_y, axis=0)
        mirrored = np.array([mirror_hands(s) for s in combined_X])
        all_X = [combined_X, mirrored]; all_y = [combined_y, combined_y]

    X_train = np.concatenate(all_X, axis=0)
    y_train = np.concatenate(all_y, axis=0)

    # Shuffle
    idx = rng.permutation(len(X_train))
    X_train, y_train = X_train[idx], y_train[idx]

    print(f'Augmented: {original_count} -> {len(X_train)} training samples ({len(X_train)/original_count:.1f}x)')
else:
    print('Augmentation disabled (AUGMENT_MULTIPLIER=0)')

In [None]:
# Cell 6: Build LSTM Model
# CRITICAL: Do not change this architecture
# - LSTM activation MUST be 'tanh' (not relu)
# - BatchNormalization after each LSTM layer
# - Dropout(0.2) for regularization

model = Sequential([
    LSTM(64, return_sequences=True, activation='tanh', input_shape=(30, 1662)),
    BatchNormalization(),
    Dropout(0.2),

    LSTM(128, return_sequences=True, activation='tanh'),
    BatchNormalization(),
    Dropout(0.2),

    LSTM(64, return_sequences=False, activation='tanh'),
    BatchNormalization(),
    Dropout(0.2),

    Dense(64, activation='relu'),
    Dense(32, activation='relu'),
    Dense(len(ACTIONS), activation='softmax')
])

model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['categorical_accuracy']
)
model.summary()

In [None]:
# Cell 7: Train
early_stop = EarlyStopping(
    monitor='val_categorical_accuracy',
    patience=30,
    restore_best_weights=True,
    verbose=1
)
checkpoint = ModelCheckpoint(
    'best_model.h5',
    monitor='val_categorical_accuracy',
    save_best_only=True,
    verbose=1
)
tb_callback = TensorBoard(log_dir='./logs')

history = model.fit(
    X_train, y_train,
    epochs=200,
    batch_size=16,
    validation_split=0.15,
    callbacks=[early_stop, checkpoint, tb_callback],
    verbose=1
)

print(f'\nBest validation accuracy: {max(history.history["val_categorical_accuracy"]):.4f}')

In [None]:
# Cell 8: Evaluate on TEST set
from tensorflow.keras.models import load_model

# Load the best model from checkpoint
best_model = load_model('best_model.h5')

# Evaluate on X_test (NOT X_train)
test_loss, test_acc = best_model.evaluate(X_test, y_test, verbose=0)
print(f'Test Loss:     {test_loss:.4f}')
print(f'Test Accuracy: {test_acc:.4f}')

# Classification report
y_pred = best_model.predict(X_test, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

print('\nClassification Report:')
print(classification_report(
    y_true_classes,
    y_pred_classes,
    target_names=ACTIONS.tolist(),
    zero_division=0
))

# Confusion matrix heatmap
cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=ACTIONS, yticklabels=ACTIONS)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150)
plt.show()
print('Confusion matrix saved to confusion_matrix.png')

In [None]:
# Cell 9: Training History Plots
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Accuracy plot
axes[0].plot(history.history['categorical_accuracy'], label='Train Accuracy')
axes[0].plot(history.history['val_categorical_accuracy'], label='Val Accuracy')
axes[0].set_title('Model Accuracy')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Accuracy')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Loss plot
axes[1].plot(history.history['loss'], label='Train Loss')
axes[1].plot(history.history['val_loss'], label='Val Loss')
axes[1].set_title('Model Loss')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_history.png', dpi=150)
plt.show()
print('Training history saved to training_history.png')

In [None]:
# Cell 10: Save + Download model
MODEL_PATH = os.path.join(MODELS_DIR, 'action_model.h5')

best_model.save(MODEL_PATH)
np.save('actions.npy', ACTIONS)

# Download directly to your PC
from google.colab import files
files.download(MODEL_PATH)
files.download('actions.npy')
files.download('confusion_matrix.png')
print('Downloads started â€” place action_model.h5 in ml/models/ locally')