# Sign2Text: Sign Language to Text Translation Training

This notebook guides you through the process of training a deep learning model to translate sign language videos to text. We'll be using a hybrid CNN-LSTM architecture that can optionally incorporate hand pose features.

## Overview

1. **Setup**: Import dependencies and configure environment
2. **Data Preparation**: Load and explore the sign language dataset
3. **Dataset & DataLoader**: Create PyTorch datasets and dataloaders
4. **Model Architecture**: Build and explore the CNN-LSTM model
5. **Training Configuration**: Configure hyperparameters and optimization
6. **Training Loop**: Train the model with monitoring and evaluation
7. **Evaluation**: Analyze model performance
8. **Export**: Save the model for inference

Let's get started!

## 1. Setup

First, let's import the necessary libraries and modules, and set up our environment.

In [None]:
import os
import sys
import time
import json
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
from tqdm.notebook import tqdm
from IPython.display import display, HTML

# Add parent directory to path for imports
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(os.getcwd())))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

# Import project modules
from ml.models.cnn_lstm import create_model, CNNLSTM, CNNLSTMWithHandPose

# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Set random seeds for reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

Matplotlib is building the font cache; this may take a moment.


### Setup Configuration Parameters

Here, we define the configuration parameters for our training process. These include paths to data, model parameters, and hyperparameters for training.

In [None]:
# Define data directories
DATA_DIR = "../../processed_data"
TRAIN_CSV = os.path.join(DATA_DIR, "train_set.csv")
VAL_CSV = os.path.join(DATA_DIR, "val_set.csv")
TEST_CSV = os.path.join(DATA_DIR, "test_set.csv")

# Create output directory for saving models and logs
OUTPUT_DIR = f"../../models/cnn_lstm_{time.strftime('%Y%m%d_%H%M%S')}"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Model and training configuration
config = {
    "batch_size": 16,              # Smaller batch size for interactive exploration
    "num_epochs": 30,              # Number of training epochs
    "learning_rate": 0.001,        # Initial learning rate
    "weight_decay": 1e-4,          # Weight decay (L2 regularization)
    "use_landmarks": True,         # Whether to use hand landmark features
    "early_stopping": 10,          # Number of epochs to wait before early stopping
    "max_seq_len": 60,             # Maximum sequence length
    "model_params": {
        "frame_dim": (3, 224, 224),  # Input frame dimensions (C, H, W)
        "lstm_hidden_dim": 512,      # LSTM hidden dimension
        "lstm_layers": 2,            # Number of LSTM layers
        "dropout": 0.5,              # Dropout probability
        "bidirectional": True,       # Whether to use bidirectional LSTM
        "attention": True            # Whether to use attention mechanism
    }
}

# Save configuration to file
with open(os.path.join(OUTPUT_DIR, "config.json"), "w") as f:
    json.dump(config, f, indent=4)

print(f"Configuration saved to {os.path.join(OUTPUT_DIR, 'config.json')}")

## 2. Data Preparation

Now, let's explore our preprocessed data. We'll load the CSV files that contain information about our training, validation, and test sets.

In [None]:
def load_and_explore_data(csv_path):
    """Load and explore the dataset from a CSV file."""
    if not os.path.exists(csv_path):
        print(f"Warning: {csv_path} does not exist. Please preprocess data first.")
        return None
    
    # Load the dataset CSV
    data_df = pd.read_csv(csv_path)
    
    # Display basic information
    print(f"Dataset contains {len(data_df)} samples")
    print(f"Number of unique classes: {len(data_df['label'].unique())}")
    
    # Display first few rows
    display(data_df.head())
    
    # Display class distribution
    class_dist = data_df['label'].value_counts()
    plt.figure(figsize=(10, 6))
    class_dist.plot(kind='bar')
    plt.title('Class Distribution')
    plt.xlabel('Class')
    plt.ylabel('Count')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    # Display feature paths distribution
    print(f"Sample feature path: {data_df['feature_path'].iloc[0]}")
    
    return data_df

# Explore training set
print("Exploring training set:")
train_df = load_and_explore_data(TRAIN_CSV)

# Explore validation set
print("\nExploring validation set:")
val_df = load_and_explore_data(VAL_CSV)

### Exploring a Sample

Let's examine an individual sample from our dataset to understand the data structure.

In [None]:
def explore_sample(data_dir, feature_path):
    """Explore a single sample from the dataset."""
    full_path = os.path.join(data_dir, feature_path)
    
    if not os.path.exists(full_path):
        print(f"Warning: {full_path} does not exist.")
        return
    
    # Load the sample
    data = np.load(full_path, allow_pickle=True)
    
    # Print available keys
    print(f"Available keys: {list(data.keys())}")
    
    # Explore frames
    frames = data['frames']
    print(f"Frames shape: {frames.shape}")
    print(f"Number of frames: {frames.shape[0]}")
    print(f"Frame dimensions: {frames.shape[1:]}")
    
    # Visualize a few frames
    fig, axes = plt.subplots(1, 4, figsize=(16, 4))
    frame_indices = [0, min(5, frames.shape[0]-1), min(10, frames.shape[0]-1), min(frames.shape[0]-1, 15)]
    
    for i, idx in enumerate(frame_indices):
        if idx < frames.shape[0]:
            axes[i].imshow(frames[idx])
            axes[i].set_title(f"Frame {idx}")
            axes[i].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    # Explore hand landmarks if available
    if 'hand_landmarks' in data:
        hand_landmarks = data['hand_landmarks']
        print(f"\nHand landmarks shape: {len(hand_landmarks)} frames of landmarks")
        
        # Show example of landmark data structure
        if len(hand_landmarks) > 0:
            landmark_frame = hand_landmarks[0]
            print(f"Landmark keys for a frame: {list(landmark_frame.keys())}")
            
            # Check if landmarks are available for left and right hands
            if landmark_frame['left'] is not None:
                print(f"Left hand landmarks shape: {landmark_frame['left'].shape}")
            else:
                print("Left hand landmarks not detected in this frame")
                
            if landmark_frame['right'] is not None:
                print(f"Right hand landmarks shape: {landmark_frame['right'].shape}")
            else:
                print("Right hand landmarks not detected in this frame")

# Explore a sample from the training set
if train_df is not None and len(train_df) > 0:
    sample_feature_path = train_df['feature_path'].iloc[0]
    print(f"Exploring sample: {sample_feature_path}")
    explore_sample(DATA_DIR, sample_feature_path)
else:
    print("Cannot explore sample: training data not available")

## 3. Dataset & DataLoader

Now, let's define our custom PyTorch Dataset class for handling sign language video data.

In [None]:
class SignVideoDataset(Dataset):
    """Dataset for loading sign language video data."""

    def __init__(
        self,
        data_csv,
        root_dir,
        transform=None,
        use_landmarks=True,
        max_seq_len=60
    ):
        """
        Initialize the dataset.
        
        Args:
            data_csv: Path to the CSV file with annotations
            root_dir: Directory containing the data files
            transform: Optional transform to apply to frames
            use_landmarks: Whether to load landmark data
            max_seq_len: Maximum sequence length
        """
        self.data_df = pd.read_csv(data_csv)
        self.root_dir = root_dir
        self.transform = transform
        self.use_landmarks = use_landmarks
        self.max_seq_len = max_seq_len

        # Create label to index mapping
        self.classes = sorted(self.data_df['label'].unique())
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        
        print(f"Initialized dataset with {len(self.data_df)} samples and {len(self.classes)} classes")

    def __len__(self):
        """Return the number of samples in the dataset."""
        return len(self.data_df)

    def __getitem__(self, idx):
        """
        Load and return a sample from the dataset.
        
        Args:
            idx: Index of the sample
            
        Returns:
            Dictionary containing data for the requested sample
        """
        sample_info = self.data_df.iloc[idx]

        # Load preprocessed data
        feature_path = os.path.join(self.root_dir, sample_info['feature_path'])
        data = np.load(feature_path, allow_pickle=True)

        # Get frames and pad/truncate if needed
        frames = data['frames']
        seq_len = min(frames.shape[0], self.max_seq_len)

        # Create padded array
        padded_frames = np.zeros((self.max_seq_len, *frames.shape[1:]), dtype=frames.dtype)
        padded_frames[:seq_len] = frames[:seq_len]

        # Convert to tensor and permute to (seq_len, channels, height, width)
        frames_tensor = torch.FloatTensor(padded_frames).permute(0, 3, 1, 2)

        # Apply transform if available
        if self.transform:
            # Apply transform to each frame
            transformed_frames = []
            for i in range(seq_len):
                transformed_frames.append(self.transform(frames_tensor[i]))

            # Pad with zeros
            for i in range(seq_len, self.max_seq_len):
                transformed_frames.append(torch.zeros_like(transformed_frames[0]))

            frames_tensor = torch.stack(transformed_frames)

        # Load landmarks if needed
        landmarks_tensor = None
        if self.use_landmarks and 'hand_landmarks' in data:
            hand_landmarks = data['hand_landmarks']

            # Process and pad landmarks
            landmarks_list = []
            for i in range(min(len(hand_landmarks), self.max_seq_len)):
                # Extract left and right hand landmarks
                left = hand_landmarks[i]['left']
                right = hand_landmarks[i]['right']

                # Flatten and concatenate
                if left is not None and right is not None:
                    # Both hands visible
                    combined = np.concatenate([left.flatten(), right.flatten()])
                elif left is not None:
                    # Only left hand visible
                    combined = np.concatenate([left.flatten(), np.zeros_like(left.flatten())])
                elif right is not None:
                    # Only right hand visible
                    combined = np.concatenate([np.zeros(63), right.flatten()])
                else:
                    # No hands visible
                    combined = np.zeros(126)

                landmarks_list.append(combined)

            # Pad if needed
            while len(landmarks_list) < self.max_seq_len:
                landmarks_list.append(np.zeros(126))

            landmarks_tensor = torch.FloatTensor(np.array(landmarks_list))

        # Get label
        label = self.class_to_idx[sample_info['label']]
        label_tensor = torch.LongTensor([label])[0]

        # Create output dictionary
        output = {
            'frames': frames_tensor,
            'label': label_tensor,
            'seq_len': torch.LongTensor([seq_len])[0]
        }

        if landmarks_tensor is not None:
            output['landmarks'] = landmarks_tensor

        return output

### Create Datasets and DataLoaders

Now, let's create our datasets and dataloaders for training and validation.

In [None]:
# Create datasets
print("Creating training dataset...")
train_dataset = SignVideoDataset(
    TRAIN_CSV,
    DATA_DIR,
    use_landmarks=config["use_landmarks"],
    max_seq_len=config["max_seq_len"]
)

print("\nCreating validation dataset...")
val_dataset = SignVideoDataset(
    VAL_CSV,
    DATA_DIR,
    use_landmarks=config["use_landmarks"],
    max_seq_len=config["max_seq_len"]
)

# Create dataloaders
train_loader = DataLoader(
    train_dataset,
    batch_size=config["batch_size"],
    shuffle=True,
    num_workers=2  # Adjust based on your system
)

val_loader = DataLoader(
    val_dataset,
    batch_size=config["batch_size"],
    shuffle=False,
    num_workers=2  # Adjust based on your system
)

### Examine a Batch of Data

Let's examine a batch of data to ensure everything is set up correctly.

In [None]:
def examine_batch(dataloader):
    """Examine a batch from a dataloader."""
    # Get a batch of data
    batch = next(iter(dataloader))
    
    # Print batch shapes
    print("Batch contents:")
    for key, value in batch.items():
        if isinstance(value, torch.Tensor):
            print(f"  {key}: shape={value.shape}, dtype={value.dtype}")
        else:
            print(f"  {key}: {type(value)}")
    
    # Visualize a few frames from the first sample
    frames = batch['frames'][0]  # First sample in batch
    seq_len = batch['seq_len'][0].item()
    
    # Display 4 frames from the sequence
    fig, axes = plt.subplots(1, 4, figsize=(16, 4))
    frame_indices = [0, min(seq_len//3, seq_len-1), min(2*seq_len//3, seq_len-1), min(seq_len-1, seq_len-1)]
    
    for i, idx in enumerate(frame_indices):
        # Convert from (C, H, W) to (H, W, C) and normalize for display
        frame = frames[idx].permute(1, 2, 0).numpy()
        # Normalize to [0, 1] range if needed
        if frame.max() > 1.0:
            frame = frame / 255.0
        axes[i].imshow(frame)
        axes[i].set_title(f"Frame {idx}")
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    # If landmarks are available, visualize them too
    if 'landmarks' in batch:
        landmarks = batch['landmarks'][0]  # First sample in batch
        
        print(f"\nLandmarks shape: {landmarks.shape}")
        
        # Plot the sum of landmark positions over time as a simple visualization
        landmark_activity = torch.sum(torch.abs(landmarks), dim=1).numpy()
        
        plt.figure(figsize=(10, 4))
        plt.plot(landmark_activity)
        plt.title('Hand Landmark Activity Over Time')
        plt.xlabel('Frame')
        plt.ylabel('Total Landmark Activity')
        plt.grid(True)
        plt.show()

# Examine a batch from the training dataloader
print("Examining a batch from the training dataloader:")
examine_batch(train_loader)

## 4. Model Architecture

Now let's look at our model architecture. We'll be using a CNN-LSTM model, optionally with hand pose features.

In [None]:
# Determine the model type based on config
model_name = "cnn_lstm_handpose" if config["use_landmarks"] else "cnn_lstm"

# Create the model
num_classes = len(train_dataset.classes)
model = create_model(
    model_name,
    num_classes=num_classes,
    **config["model_params"]
)

# Print model architecture
print(f"Model type: {model_name}")
print(f"Number of classes: {num_classes}")
print(f"\nModel architecture:\n{model}")

# Print number of parameters
num_params = sum(p.numel() for p in model.parameters())
num_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nTotal parameters: {num_params:,}")
print(f"Trainable parameters: {num_trainable_params:,}")

### Forward Pass Test

Let's test the forward pass through our model with a sample batch to ensure everything works correctly.

In [None]:
# Get a batch of data
batch = next(iter(train_loader))

# Move batch to device
frames = batch['frames'].to(device)
labels = batch['label'].to(device)

# Print input shapes
print(f"Input frames shape: {frames.shape}")
print(f"Input labels shape: {labels.shape}")

# Move model to device
model = model.to(device)

# Forward pass
with torch.no_grad():
    try:
        if config["use_landmarks"]:
            landmarks = batch['landmarks'].to(device)
            print(f"Input landmarks shape: {landmarks.shape}")
            outputs = model(frames, landmarks)
        else:
            outputs = model(frames)
        
        # Print output shapes
        if isinstance(outputs, dict):
            for key, value in outputs.items():
                print(f"Output {key} shape: {value.shape}")
        else:
            print(f"Output shape: {outputs.shape}")
        
        print("\n✓ Forward pass successful!")
    except Exception as e:
        print(f"Error during forward pass: {str(e)}")

## 5. Training Configuration

Now, let's set up our training configuration, including the loss function, optimizer, and learning rate scheduler.

In [None]:
# Define loss function
criterion = nn.CrossEntropyLoss()
print(f"Loss function: {criterion}")

# Define optimizer
optimizer = optim.Adam(
    model.parameters(),
    lr=config["learning_rate"],
    weight_decay=config["weight_decay"]
)
print(f"Optimizer: {optimizer}")

# Define learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)
print(f"LR Scheduler: {scheduler}")

# Define TensorBoard writer
writer = SummaryWriter(log_dir=os.path.join(OUTPUT_DIR, 'logs'))
print(f"TensorBoard logs will be saved to {os.path.join(OUTPUT_DIR, 'logs')}")

## 6. Training Loop

Now, let's define our training and validation functions, and implement the main training loop.

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train the model for one epoch."""
    model.train()
    epoch_loss = 0.0
    correct = 0
    total = 0

    with tqdm(train_loader, desc="Training") as pbar:
        for batch in pbar:
            # Move batch to device
            frames = batch['frames'].to(device)
            labels = batch['label'].to(device)

            # Forward pass
            optimizer.zero_grad()

            # Handle different model input types
            if 'landmarks' in batch and hasattr(model, 'forward') and 'landmarks' in model.forward.__code__.co_varnames:
                landmarks = batch['landmarks'].to(device)
                outputs = model(frames, landmarks)
            else:
                outputs = model(frames)

            # Get logits
            if isinstance(outputs, dict):
                logits = outputs['logits']
            else:
                logits = outputs

            # Calculate loss
            loss = criterion(logits, labels)

            # Backward pass
            loss.backward()
            optimizer.step()

            # Update metrics
            epoch_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Update progress bar
            pbar.set_postfix({
                'loss': f"{loss.item():.4f}",
                'acc': f"{100 * correct / total:.2f}%"
            })

    # Calculate epoch metrics
    avg_loss = epoch_loss / len(train_loader)
    accuracy = correct / total

    return avg_loss, accuracy


def validate(model, val_loader, criterion, device):
    """Validate the model on the validation set."""
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        with tqdm(val_loader, desc="Validation") as pbar:
            for batch in pbar:
                # Move batch to device
                frames = batch['frames'].to(device)
                labels = batch['label'].to(device)

                # Forward pass
                if 'landmarks' in batch and hasattr(model, 'forward') and 'landmarks' in model.forward.__code__.co_varnames:
                    landmarks = batch['landmarks'].to(device)
                    outputs = model(frames, landmarks)
                else:
                    outputs = model(frames)

                # Get logits
                if isinstance(outputs, dict):
                    logits = outputs['logits']
                else:
                    logits = outputs

                # Calculate loss
                loss = criterion(logits, labels)

                # Update metrics
                val_loss += loss.item()
                _, predicted = torch.max(logits.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                # Store predictions and labels for confusion matrix
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                # Update progress bar
                pbar.set_postfix({
                    'loss': f"{loss.item():.4f}",
                    'acc': f"{100 * correct / total:.2f}%"
                })

    # Calculate epoch metrics
    avg_loss = val_loss / len(val_loader)
    accuracy = correct / total

    return avg_loss, accuracy, all_preds, all_labels


def save_checkpoint(model, optimizer, scheduler, epoch, val_loss, val_acc, output_dir, is_best=False):
    """Save a checkpoint of the model."""
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'val_loss': val_loss,
        'val_acc': val_acc
    }
    
    if scheduler is not None:
        checkpoint['scheduler_state_dict'] = scheduler.state_dict()
    
    # Save latest checkpoint
    checkpoint_path = os.path.join(output_dir, 'checkpoint_latest.pth')
    torch.save(checkpoint, checkpoint_path)
    
    # Save best model
    if is_best:
        best_path = os.path.join(output_dir, 'model_best.pth')
        torch.save(checkpoint, best_path)
        
        # Save model architecture as text
        with open(os.path.join(output_dir, 'model_architecture.txt'), 'w') as f:
            f.write(str(model))

### Main Training Loop

Now let's implement the main training loop that will train our model for the specified number of epochs.

In [None]:
# Training parameters
num_epochs = config['num_epochs']
early_stopping = config['early_stopping']

# Initialize tracking variables
best_val_loss = float('inf')
best_val_acc = 0.0
epochs_without_improvement = 0
history = {
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': []
}

# Move model to device
model = model.to(device)

print(f"Starting training for {num_epochs} epochs")
print(f"Early stopping patience: {early_stopping} epochs")
print(f"Outputs will be saved to: {OUTPUT_DIR}")

# Training loop
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    
    # Train for one epoch
    start_time = time.time()
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    
    # Validate
    val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
    
    # Update learning rate if scheduler is available
    if scheduler is not None:
        if isinstance(scheduler, optim.lr_scheduler.ReduceLROnPlateau):
            scheduler.step(val_loss)
        else:
            scheduler.step()
    
    # Log metrics
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('Accuracy/train', train_acc, epoch)
    writer.add_scalar('Accuracy/val', val_acc, epoch)
    
    # Update history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    # Log epoch summary
    epoch_time = time.time() - start_time
    print(f"Epoch {epoch+1}/{num_epochs} completed in {epoch_time:.2f}s")
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}")
    
    # Check if this is the best model
    is_best = False
    if val_acc > best_val_acc:
        print(f"Validation accuracy improved from {best_val_acc:.4f} to {val_acc:.4f}")
        best_val_acc = val_acc
        best_val_loss = val_loss
        epochs_without_improvement = 0
        is_best = True
    else:
        epochs_without_improvement += 1
        print(f"No improvement for {epochs_without_improvement} epochs")
    
    # Save checkpoint
    save_checkpoint(
        model, optimizer, scheduler, epoch,
        val_loss, val_acc, OUTPUT_DIR, is_best
    )
    
    # Early stopping
    if early_stopping > 0 and epochs_without_improvement >= early_stopping:
        print(f"Early stopping after {epoch+1} epochs")
        break

# Close tensorboard writer
writer.close()

# Log training summary
print(f"\nTraining completed. Best validation accuracy: {best_val_acc:.4f}")

# Save training history
history_path = os.path.join(OUTPUT_DIR, 'training_history.json')
with open(history_path, 'w') as f:
    json.dump({k: [float(v) for v in vals] for k, vals in history.items()}, f, indent=4)
    
print(f"Training history saved to {history_path}")

## 7. Evaluation and Visualization

Now, let's visualize our training results and evaluate the model's performance.

In [None]:
def plot_training_history(history):
    """Plot the training history."""
    # Create a figure with two subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot training and validation loss
    ax1.plot(history['train_loss'], label='Train Loss')
    ax1.plot(history['val_loss'], label='Validation Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.set_title('Training and Validation Loss')
    ax1.legend()
    ax1.grid(True)
    
    # Plot training and validation accuracy
    ax2.plot(history['train_acc'], label='Train Accuracy')
    ax2.plot(history['val_acc'], label='Validation Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.set_title('Training and Validation Accuracy')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # Also save the figure to file
    fig.savefig(os.path.join(OUTPUT_DIR, 'training_history.png'), dpi=300, bbox_inches='tight')
    print(f"Training history plot saved to {os.path.join(OUTPUT_DIR, 'training_history.png')}")

# Plot the training history
plot_training_history(history)

### Confusion Matrix

Let's evaluate the model on the validation set and create a confusion matrix to visualize performance across different classes.

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

# Evaluate the model on the validation set
print("Evaluating model on validation set...")
val_loss, val_acc, val_preds, val_labels = validate(model, val_loader, criterion, device)
print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}\n")

# Get class names
class_names = val_dataset.classes

# If there are too many classes, sample a subset for visualization
max_classes_to_show = 20
if len(class_names) > max_classes_to_show:
    # Sample classes for visualization
    unique_labels = np.unique(val_labels)
    if len(unique_labels) > max_classes_to_show:
        selected_classes = np.random.choice(unique_labels, max_classes_to_show, replace=False)
        mask = np.isin(val_labels, selected_classes)
        val_preds_subset = np.array(val_preds)[mask]
        val_labels_subset = np.array(val_labels)[mask]
        class_indices = selected_classes
    else:
        val_preds_subset = val_preds
        val_labels_subset = val_labels
        class_indices = unique_labels
    
    # Get class names for the selected indices
    class_names_subset = [class_names[i] for i in class_indices]
else:
    val_preds_subset = val_preds
    val_labels_subset = val_labels
    class_names_subset = class_names

# Create confusion matrix
cm = confusion_matrix(val_labels_subset, val_preds_subset)

# Normalize confusion matrix
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Plot confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Blues', 
            xticklabels=class_names_subset, yticklabels=class_names_subset)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix (Normalized)')
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'confusion_matrix.png'), dpi=300, bbox_inches='tight')
plt.show()

# Print classification report
print("Classification Report:")
print(classification_report(val_labels, val_preds, target_names=class_names, digits=3))

## 8. Inference and Export

Finally, let's test the model on a sample and export it for deployment.

In [None]:
def predict_sample(model, dataset, sample_idx, device):
    """Make a prediction for a single sample."""
    # Set model to evaluation mode
    model.eval()
    
    # Get the sample
    sample = dataset[sample_idx]
    frames = sample['frames'].unsqueeze(0).to(device)  # Add batch dimension
    true_label = sample['label'].item()
    true_class = dataset.classes[true_label]
    
    # Forward pass
    with torch.no_grad():
        if 'landmarks' in sample and hasattr(model, 'forward') and 'landmarks' in model.forward.__code__.co_varnames:
            landmarks = sample['landmarks'].unsqueeze(0).to(device)
            outputs = model(frames, landmarks)
        else:
            outputs = model(frames)
        
        # Get logits
        if isinstance(outputs, dict):
            logits = outputs['logits']
            attention = outputs.get('attention_weights', None)
        else:
            logits = outputs
            attention = None
    
    # Get predicted class
    _, predicted_idx = torch.max(logits, 1)
    predicted_label = predicted_idx.item()
    predicted_class = dataset.classes[predicted_label]
    
    # Get probabilities
    probs = torch.nn.functional.softmax(logits, dim=1)[0]
    
    # Print results
    print(f"True class: {true_class} (idx: {true_label})")
    print(f"Predicted class: {predicted_class} (idx: {predicted_label})")
    print(f"Confidence: {probs[predicted_label]:.4f}")
    
    # Visualize frames with attention if available
    frames_np = frames[0].cpu().numpy()
    seq_len = sample['seq_len'].item()
    
    # Select frames to visualize
    num_frames = min(8, seq_len)
    frame_indices = np.linspace(0, seq_len-1, num_frames, dtype=int)
    
    # Create figure
    fig, axes = plt.subplots(1, num_frames, figsize=(16, 4))
    
    for i, idx in enumerate(frame_indices):
        # Display frame
        frame = frames_np[idx].transpose(1, 2, 0)  # C,H,W -> H,W,C
        # Normalize if needed
        if frame.max() > 1.0:
            frame = frame / 255.0
        axes[i].imshow(frame)
        
        # Add attention weight as title if available
        if attention is not None:
            att_weight = attention[0, idx].item()
            axes[i].set_title(f"Att: {att_weight:.3f}")
        else:
            axes[i].set_title(f"Frame {idx}")
        
        axes[i].axis('off')
    
    plt.suptitle(f"True: {true_class}, Pred: {predicted_class} ({probs[predicted_label]:.4f})")
    plt.tight_layout()
    plt.show()
    
    return {
        'true_label': true_label,
        'true_class': true_class,
        'predicted_label': predicted_label,
        'predicted_class': predicted_class,
        'confidence': probs[predicted_label].item(),
        'probabilities': probs.cpu().numpy()
    }

# Test the model on a few random samples from the validation set
print("Testing model on random samples from validation set:")
num_samples = 3
for i in range(num_samples):
    sample_idx = np.random.randint(0, len(val_dataset))
    print(f"\nSample {i+1}/{num_samples} (index {sample_idx}):")
    predict_sample(model, val_dataset, sample_idx, device)

### Export the Model

Finally, let's export the model for deployment.

In [None]:
# Save the trained model
model_path = os.path.join(OUTPUT_DIR, 'model_final.pth')
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'class_names': val_dataset.classes,
    'config': config
}, model_path)
print(f"Model saved to {model_path}")

# Export model in TorchScript format for deployment
try:
    # Prepare a sample input
    sample = next(iter(val_loader))
    sample_frames = sample['frames'].to(device)
    
    # Create scripted model
    model.eval()
    with torch.no_grad():
        if config["use_landmarks"]:
            sample_landmarks = sample['landmarks'].to(device)
            traced_model = torch.jit.trace(model, (sample_frames, sample_landmarks))
        else:
            traced_model = torch.jit.trace(model, sample_frames)
    
    # Save the scripted model
    script_path = os.path.join(OUTPUT_DIR, 'model_scripted.pt')
    traced_model.save(script_path)
    print(f"TorchScript model saved to {script_path}")
except Exception as e:
    print(f"Error exporting TorchScript model: {str(e)}")
    print("You may need to modify the model to make it exportable")

## Conclusion

In this notebook, we've walked through the complete process of training a sign language recognition model using a CNN-LSTM architecture with optional hand pose features. Here's what we covered:

1. **Setup and Data Preparation**: We configured our environment, loaded and explored the preprocessed sign language dataset.

2. **Dataset & DataLoader**: We created a custom PyTorch dataset for handling sign language video data with optional hand landmarks.

3. **Model Architecture**: We explored the CNN-LSTM model architecture, which combines CNN for spatial feature extraction and LSTM for temporal sequence processing.

4. **Training**: We trained the model, monitoring its performance with TensorBoard and implementing early stopping to prevent overfitting.

5. **Evaluation**: We evaluated the model's performance using accuracy metrics and confusion matrices to visualize class-wise performance.

6. **Export**: Finally, we exported the model in both PyTorch and TorchScript formats for deployment.

### Next Steps

To further improve the model, you could:

1. **Data Augmentation**: Implement techniques like random cropping, rotation, or temporal augmentation to increase the effective size of the training dataset.

2. **Architecture Improvements**: Experiment with more sophisticated CNN backbones (ResNet, EfficientNet) or sequence models (Transformer).

3. **Hyperparameter Tuning**: Use techniques like grid search or Bayesian optimization to find the best hyperparameters.

4. **Transfer Learning**: Leverage pre-trained models on large-scale video datasets to improve feature extraction.

5. **Deployment**: Deploy the model in a real-time application using the exported TorchScript model.

Thank you for following along with this Sign2Text training notebook!