# üß† Temporal LSTM Proctoring Model Training

Train an LSTM model for temporal behavior analysis - detecting cheating patterns over sequences of frames.

**Based on**: AutoOEP/Temporal/temporal_trainer.py

## Why Temporal Analysis?

Static models analyze individual frames, but cheating behavior often involves patterns:
- Frequent looking away then back
- Gradual head movements toward notes
- Suspicious hand movements over time

The LSTM learns these temporal patterns from sequences of features.

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import json
from datetime import datetime
from collections import deque

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report
)

import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print("‚úÖ Libraries imported")

## 1. Configuration

In [None]:
# Paths
BASE_PATH = os.path.dirname(os.getcwd())  # ml/ directory
DATA_PATH = os.path.join(BASE_PATH, 'data', 'proctoring')
MODEL_PATH = os.path.join(BASE_PATH, 'models', 'proctoring')

os.makedirs(MODEL_PATH, exist_ok=True)

# Training configuration
CONFIG = {
    # Data
    'window_size': 15,       # Number of frames in each sequence
    'overlap': 5,            # Overlap between sequences
    'test_size': 0.2,
    
    # Model
    'model_type': 'lstm',    # 'lstm' or 'gru'
    'hidden_size': 128,
    'num_layers': 2,
    'dropout': 0.3,
    
    # Training
    'epochs': 80,
    'batch_size': 32,
    'learning_rate': 0.001,
    'early_stopping_patience': 15,
    
    # Inference
    'threshold': 0.4,
    'random_state': 42
}

# Feature columns (must match static model)
FEATURE_COLUMNS = [
    'verification_result', 'num_faces', 'iris_pos', 'iris_ratio',
    'mouth_zone', 'mouth_area', 'x_rotation', 'y_rotation', 'z_rotation',
    'radial_distance', 'gaze_direction', 'gaze_zone',
    'watch', 'headphone', 'closedbook', 'earpiece', 'cell phone',
    'openbook', 'chits', 'sheet', 'H-Distance', 'F-Distance'
]

print(f"Window size: {CONFIG['window_size']} frames")
print(f"Features: {len(FEATURE_COLUMNS)}")

## 2. Model Definition

In [None]:
class LSTMModel(nn.Module):
    """LSTM model for temporal cheating detection."""
    
    def __init__(self, input_size, hidden_size=128, num_layers=2, dropout=0.3):
        super(LSTMModel, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        # x: (batch, seq_len, features)
        lstm_out, (h_n, c_n) = self.lstm(x)
        # Use last hidden state
        last_hidden = lstm_out[:, -1, :]
        out = self.dropout(last_hidden)
        out = self.fc(out)
        return self.sigmoid(out).squeeze(-1)


class GRUModel(nn.Module):
    """GRU model (alternative to LSTM)."""
    
    def __init__(self, input_size, hidden_size=128, num_layers=2, dropout=0.3):
        super(GRUModel, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        gru_out, h_n = self.gru(x)
        last_hidden = gru_out[:, -1, :]
        out = self.dropout(last_hidden)
        out = self.fc(out)
        return self.sigmoid(out).squeeze(-1)


print("‚úÖ Model classes defined")

## 3. Data Preparation

In [None]:
class SequenceDataset(Dataset):
    """PyTorch Dataset for sequences."""
    
    def __init__(self, sequences, labels):
        self.sequences = torch.FloatTensor(sequences)
        self.labels = torch.FloatTensor(labels)
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]


def create_sequences(df, feature_columns, window_size, overlap):
    """Create sliding window sequences from dataframe."""
    sequences = []
    labels = []
    
    # Group by video to maintain temporal continuity
    if 'video' in df.columns:
        groups = df.groupby('video')
    else:
        groups = [('all', df)]
    
    step = window_size - overlap
    
    for video_name, video_df in groups:
        video_df = video_df.sort_values('timestamp') if 'timestamp' in video_df.columns else video_df
        features = video_df[feature_columns].values
        targets = video_df['is_cheating'].values
        
        for i in range(0, len(features) - window_size + 1, step):
            seq = features[i:i + window_size]
            # Label is 1 if any frame in window is cheating
            label = 1 if np.any(targets[i:i + window_size] == 1) else 0
            
            sequences.append(seq)
            labels.append(label)
    
    return np.array(sequences), np.array(labels)


def load_and_prepare_data(data_path, feature_columns, config):
    """Load data and create sequences."""
    
    # Load CSVs
    csv_files = [f for f in os.listdir(data_path) if f.endswith('.csv')]
    if not csv_files:
        print(f"‚ùå No CSV files in {data_path}")
        return None, None, None, None, None
    
    dfs = [pd.read_csv(os.path.join(data_path, f)) for f in csv_files]
    combined_df = pd.concat(dfs, ignore_index=True)
    
    print(f"Total rows: {len(combined_df)}")
    
    # Filter to available features
    available = [c for c in feature_columns if c in combined_df.columns]
    combined_df = combined_df.fillna(0)
    
    # Split by video for train/test
    if 'video' in combined_df.columns:
        videos = combined_df['video'].unique()
        np.random.seed(config['random_state'])
        np.random.shuffle(videos)
        split_idx = int(len(videos) * (1 - config['test_size']))
        train_videos = videos[:split_idx]
        test_videos = videos[split_idx:]
        
        train_df = combined_df[combined_df['video'].isin(train_videos)]
        test_df = combined_df[combined_df['video'].isin(test_videos)]
    else:
        # Random split
        train_df = combined_df.sample(frac=1-config['test_size'], random_state=config['random_state'])
        test_df = combined_df.drop(train_df.index)
    
    print(f"Train: {len(train_df)} rows, Test: {len(test_df)} rows")
    
    # Create sequences
    X_train, y_train = create_sequences(train_df, available, config['window_size'], config['overlap'])
    X_test, y_test = create_sequences(test_df, available, config['window_size'], config['overlap'])
    
    print(f"Train sequences: {len(X_train)}, Test sequences: {len(X_test)}")
    print(f"Sequence shape: {X_train.shape}")
    
    # Scale features
    scaler = StandardScaler()
    X_train_flat = X_train.reshape(-1, X_train.shape[-1])
    scaler.fit(X_train_flat)
    
    X_train_scaled = scaler.transform(X_train_flat).reshape(X_train.shape)
    X_test_flat = X_test.reshape(-1, X_test.shape[-1])
    X_test_scaled = scaler.transform(X_test_flat).reshape(X_test.shape)
    
    return X_train_scaled, y_train, X_test_scaled, y_test, scaler


# Load data
X_train, y_train, X_test, y_test, scaler = load_and_prepare_data(
    DATA_PATH, FEATURE_COLUMNS, CONFIG
)

if X_train is not None:
    print(f"\nClass balance (train): {np.mean(y_train):.2%} positive")
    print(f"Class balance (test): {np.mean(y_test):.2%} positive")

## 4. Training

In [None]:
def train_model(model, train_loader, val_loader, config, device):
    """Train the LSTM model."""
    
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'])
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
    
    history = {'train_loss': [], 'val_loss': [], 'val_auc': []}
    best_val_loss = float('inf')
    patience_counter = 0
    best_model_state = None
    
    for epoch in range(config['epochs']):
        # Training
        model.train()
        train_loss = 0.0
        
        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        
        # Validation
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences, labels = sequences.to(device), labels.to(device)
                outputs = model(sequences)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                all_preds.extend(outputs.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        
        val_loss /= len(val_loader)
        val_auc = roc_auc_score(all_labels, all_preds)
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_auc'].append(val_auc)
        
        scheduler.step(val_loss)
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{config['epochs']} | "
                  f"Train Loss: {train_loss:.4f} | "
                  f"Val Loss: {val_loss:.4f} | "
                  f"Val AUC: {val_auc:.4f}")
        
        if patience_counter >= config['early_stopping_patience']:
            print(f"\nEarly stopping at epoch {epoch+1}")
            break
    
    # Load best model
    if best_model_state:
        model.load_state_dict(best_model_state)
    
    return model, history

In [None]:
if X_train is not None:
    # Create DataLoaders
    train_dataset = SequenceDataset(X_train, y_train)
    test_dataset = SequenceDataset(X_test, y_test)
    
    train_loader = DataLoader(train_dataset, batch_size=CONFIG['batch_size'], shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=CONFIG['batch_size'], shuffle=False)
    
    # Create model
    input_size = X_train.shape[-1]
    
    if CONFIG['model_type'] == 'lstm':
        model = LSTMModel(
            input_size=input_size,
            hidden_size=CONFIG['hidden_size'],
            num_layers=CONFIG['num_layers'],
            dropout=CONFIG['dropout']
        ).to(device)
    else:
        model = GRUModel(
            input_size=input_size,
            hidden_size=CONFIG['hidden_size'],
            num_layers=CONFIG['num_layers'],
            dropout=CONFIG['dropout']
        ).to(device)
    
    print(f"Model: {CONFIG['model_type'].upper()}")
    print(f"Parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # Train
    print("\nTraining...")
    model, history = train_model(model, train_loader, test_loader, CONFIG, device)
    print("\n‚úÖ Training complete")

## 5. Evaluation

In [None]:
def evaluate_model(model, test_loader, threshold, device):
    """Evaluate model on test set."""
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for sequences, labels in test_loader:
            sequences = sequences.to(device)
            outputs = model(sequences)
            all_preds.extend(outputs.cpu().numpy())
            all_labels.extend(labels.numpy())
    
    y_pred = (np.array(all_preds) >= threshold).astype(int)
    y_proba = np.array(all_preds)
    y_true = np.array(all_labels)
    
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc_roc': roc_auc_score(y_true, y_proba)
    }
    
    print("=" * 50)
    print("TEMPORAL MODEL EVALUATION")
    print("=" * 50)
    for name, value in metrics.items():
        print(f"{name.upper():15}: {value:.4f}")
    
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=['Not Cheating', 'Cheating']))
    
    return metrics, y_proba, y_true


if X_train is not None:
    metrics, y_proba, y_true = evaluate_model(model, test_loader, CONFIG['threshold'], device)
    
    # Plot training history
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    axes[0].plot(history['train_loss'], label='Train')
    axes[0].plot(history['val_loss'], label='Validation')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    axes[0].set_title('Training Loss')
    
    axes[1].plot(history['val_auc'])
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('AUC-ROC')
    axes[1].set_title('Validation AUC')
    
    plt.tight_layout()
    plt.show()

## 6. Save Model

In [None]:
def save_temporal_model(model, scaler, config, metrics, model_path):
    """Save LSTM model and metadata."""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Save model with scaler
    model_file = os.path.join(model_path, 'temporal_lstm.pt')
    
    save_dict = {
        'model_state_dict': model.state_dict(),
        'scaler_mean': scaler.mean_,
        'scaler_scale': scaler.scale_,
        'config': config,
        'input_size': config.get('input_size', 22)
    }
    
    torch.save(save_dict, model_file)
    print(f"‚úÖ Model saved: {model_file}")
    
    # Save metadata
    metadata = {
        'model_type': config['model_type'],
        'window_size': config['window_size'],
        'hidden_size': config['hidden_size'],
        'num_layers': config['num_layers'],
        'threshold': config['threshold'],
        'training_date': timestamp,
        'metrics': {k: float(v) for k, v in metrics.items()}
    }
    
    metadata_file = os.path.join(model_path, 'temporal_metadata.json')
    with open(metadata_file, 'w') as f:
        json.dump(metadata, f, indent=2)
    print(f"‚úÖ Metadata saved: {metadata_file}")


if X_train is not None:
    CONFIG['input_size'] = X_train.shape[-1]
    save_temporal_model(model, scaler, CONFIG, metrics, MODEL_PATH)
    print("\nüéâ Temporal model training complete!")

## 7. Inference Example

In [None]:
def predict_sequence(model, scaler, sequence, device, threshold=0.4):
    """
    Predict cheating probability for a sequence of frames.
    
    Args:
        model: Trained LSTM model
        scaler: Fitted StandardScaler
        sequence: numpy array of shape (window_size, num_features)
        threshold: Classification threshold
    
    Returns:
        probability: Cheating probability (0-1)
        is_cheating: Boolean prediction
    """
    model.eval()
    
    # Scale features
    seq_scaled = scaler.transform(sequence)
    seq_tensor = torch.FloatTensor(seq_scaled).unsqueeze(0).to(device)
    
    with torch.no_grad():
        prob = model(seq_tensor).item()
    
    return prob, prob >= threshold


# Example usage
if X_train is not None:
    # Get a test sequence
    test_seq = X_test[0]  # Already scaled
    
    # For demo, unscale it first
    test_seq_unscaled = scaler.inverse_transform(test_seq)
    
    prob, is_cheating = predict_sequence(model, scaler, test_seq_unscaled, device)
    print(f"Cheating probability: {prob:.2%}")
    print(f"Prediction: {'CHEATING' if is_cheating else 'NOT CHEATING'}")