# **EduSense: Intelligent Confusion Detection System**
## YOLO + Vision Transformer + Kolmogorov-Arnold Networks

**King Khalid University - College of Computer Science**  
**Graduation Project 2025**

---

### Team Members:
- Saeed Mohammed S Asiri (444810913)
- Fahad Abdullah Ali AL-Qahtani (444802593)
- Khalid Mushabbab Al-Dahwan (444803647)
- Ahmad Turki Al Sultan (444803284)
- Basil Hasan Al Muawwadh (442811409)

**Supervisor:** Dr. Anand Deva Durai C

---

### Architecture Pipeline:
```
Video Frame ‚Üí YOLOv8 (Face Detection) ‚Üí ViT (Feature Extraction) ‚Üí 
Temporal Aggregation ‚Üí KAN (Confusion Classification) ‚Üí Confusion Curve
```

## üì¶ **1. Installation & Setup**

In [None]:
# Install required packages
!pip install -q ultralytics transformers opencv-python scipy matplotlib seaborn tqdm
!pip install -q torch torchvision --upgrade

print("‚úÖ All packages installed successfully!")

In [None]:
# Import libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import cv2
import numpy as np
from scipy.interpolate import BSpline
from scipy.signal import savgol_filter, find_peaks

from transformers import ViTModel, ViTImageProcessor
from ultralytics import YOLO

import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
from PIL import Image
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA Version: {torch.version.cuda}")

In [None]:
import kagglehub
import shutil

path = kagglehub.dataset_download("olgaparfenova/daisee")
print("Downloaded to:", path)

src = path
dst = "/content/DAiSEE"

shutil.copytree(src, dst, dirs_exist_ok=True)

In [None]:
import os

print(os.listdir("/content/DAiSEE"))
print(os.listdir("/content/DAiSEE/DAiSEE"))


In [None]:
import os

DATA_ROOT = "/content/DAiSEE/DAiSEE"

print("Root:", os.listdir(DATA_ROOT))

# DataSet folder
dataset_path = os.path.join(DATA_ROOT, "DataSet")
print("DataSet folder:", os.listdir(dataset_path))

# Count all subject folders inside DataSet
subjects = os.listdir(dataset_path)
print("Number of subject folders:", len(subjects))

# Example: count videos inside first subject
first_subject = os.path.join(dataset_path, subjects[0])
print("Example subject:", subjects[0])
print("Number of clips inside:", len(os.listdir(first_subject)))


In [None]:
train_path = os.path.join(DATA_ROOT, "DataSet", "Train")

print("Train subjects:", len(os.listdir(train_path)))

first_subject = os.listdir(train_path)[0]
first_subject_path = os.path.join(train_path, first_subject)

print("Example subject:", first_subject)
print("Clips inside:", len(os.listdir(first_subject_path)))


In [None]:
labels_path = os.path.join(DATA_ROOT, "Labels", "TrainLabels.csv")
labels = pd.read_csv(labels_path)

print(labels.head())
print("Total labels:", len(labels))


## üéØ **2. Component 1: YOLOv8 Face Detector**

In [None]:
class YOLOFaceDetector:
    """
    YOLOv8-based real-time face detection.
    Detects faces, returns bounding boxes and crops.
    """
    
    def __init__(self, model_path='yolov8n.pt', conf_threshold=0.5, device='cuda'):
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.device = device
        
    def detect_faces(self, frame, return_crops=True, padding=0.2):
        """
        Detect faces in frame.
        
        Args:
            frame: Input frame (H, W, 3) BGR
            return_crops: Return cropped faces
            padding: Padding around bbox (0.2 = 20%)
        
        Returns:
            List of detections with bboxes and crops
        """
        results = self.model(frame, conf=self.conf_threshold, verbose=False)
        
        detections = []
        
        for result in results:
            boxes = result.boxes
            
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                conf = float(box.conf[0].cpu().numpy())
                
                detection = {
                    'bbox': [int(x1), int(y1), int(x2), int(y2)],
                    'confidence': conf
                }
                
                # Add padding
                if return_crops:
                    bbox_padded = self._add_padding(frame, detection['bbox'], padding)
                    detection['face_crop'] = self._crop_face(frame, bbox_padded)
                    detection['bbox_padded'] = bbox_padded
                
                detections.append(detection)
        
        return detections
    
    def _add_padding(self, frame, bbox, padding):
        x1, y1, x2, y2 = bbox
        h, w = frame.shape[:2]
        
        face_w = x2 - x1
        face_h = y2 - y1
        pad_w = int(face_w * padding)
        pad_h = int(face_h * padding)
        
        x1_pad = max(0, x1 - pad_w)
        y1_pad = max(0, y1 - pad_h)
        x2_pad = min(w, x2 + pad_w)
        y2_pad = min(h, y2 + pad_h)
        
        return [x1_pad, y1_pad, x2_pad, y2_pad]
    
    def _crop_face(self, frame, bbox):
        x1, y1, x2, y2 = bbox
        return frame[y1:y2, x1:x2].copy()
    
    def get_largest_face(self, detections):
        """Return largest face (primary student)"""
        if not detections:
            return None
        
        areas = [(d['bbox'][2] - d['bbox'][0]) * (d['bbox'][3] - d['bbox'][1]) for d in detections]
        return detections[np.argmax(areas)]


# Test YOLO
print("Initializing YOLOv8 Face Detector...")
yolo_detector = YOLOFaceDetector(device=device)
print("‚úÖ YOLO ready!")

## üß† **3. Component 2: Vision Transformer Feature Extractor**

In [None]:
class ViTFeatureExtractor:
    """
    Vision Transformer for extracting facial features.
    Uses pre-trained ViT from HuggingFace.
    """
    
    def __init__(self, model_name='google/vit-base-patch16-224-in21k', device='cuda'):
        self.device = torch.device(device)
        
        print(f"Loading Vision Transformer: {model_name}")
        self.processor = ViTImageProcessor.from_pretrained(model_name)
        self.model = ViTModel.from_pretrained(model_name).to(self.device)
        self.model.eval()
        
        self.embedding_dim = self.model.config.hidden_size
        print(f"‚úÖ ViT loaded | Embedding dim: {self.embedding_dim}")
    
    def extract_features(self, face_image):
        """
        Extract features from face crop.
        
        Args:
            face_image: Face crop (H, W, 3) BGR or PIL Image
        
        Returns:
            features: (embedding_dim,) numpy array
        """
        # Convert BGR to RGB
        if isinstance(face_image, np.ndarray):
            face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
            face_image = Image.fromarray(face_image)
        
        # Preprocess
        inputs = self.processor(images=face_image, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        
        # Extract features
        with torch.no_grad():
            outputs = self.model(**inputs)
            # Use [CLS] token
            cls_token = outputs.last_hidden_state[:, 0, :].cpu().numpy()
        
        return cls_token.squeeze()
    
    def extract_features_batch(self, face_images, batch_size=8):
        """
        Batch feature extraction for efficiency.
        
        Args:
            face_images: List of face crops
            batch_size: Batch size
        
        Returns:
            features: (num_faces, embedding_dim) array
        """
        all_features = []
        
        for i in range(0, len(face_images), batch_size):
            batch = face_images[i:i+batch_size]
            
            # Convert to PIL
            batch_pil = []
            for img in batch:
                if isinstance(img, np.ndarray):
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    img = Image.fromarray(img)
                batch_pil.append(img)
            
            # Process batch
            inputs = self.processor(images=batch_pil, return_tensors="pt")
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            with torch.no_grad():
                outputs = self.model(**inputs)
                cls_tokens = outputs.last_hidden_state[:, 0, :].cpu().numpy()
            
            all_features.append(cls_tokens)
        
        return np.vstack(all_features)


# Initialize ViT
vit_extractor = ViTFeatureExtractor(device=device)

## üî¨ **4. Component 3: Kolmogorov-Arnold Network (KAN)**

In [None]:
class KANLayer(nn.Module):
    """
    Kolmogorov-Arnold Network Layer with learnable B-spline basis functions.
    
    Unlike traditional neural networks with fixed activations (ReLU, Sigmoid),
    KAN learns the activation functions themselves as B-splines.
    """
    
    def __init__(self, in_features, out_features, num_basis=8, spline_order=3, grid_range=(-1, 1)):
        super().__init__()
        
        self.in_features = in_features
        self.out_features = out_features
        self.num_basis = num_basis
        self.spline_order = spline_order
        self.grid_range = grid_range
        
        # Learnable spline coefficients: (in_features, out_features, num_basis)
        self.spline_coeffs = nn.Parameter(
            torch.randn(in_features, out_features, num_basis) * 0.1
        )
        
        # Create B-spline knot vector
        num_knots = num_basis + spline_order + 1
        internal_knots = num_basis - spline_order + 1
        
        knots = np.concatenate([
            np.full(spline_order, grid_range[0]),
            np.linspace(grid_range[0], grid_range[1], internal_knots),
            np.full(spline_order, grid_range[1])
        ])
        
        self.register_buffer('knots', torch.tensor(knots, dtype=torch.float32))
    
    def forward(self, x):
        """
        Forward pass.
        
        Args:
            x: (batch_size, in_features)
        
        Returns:
            (batch_size, out_features)
        """
        batch_size = x.size(0)
        
        # Normalize input to [-1, 1] using tanh
        x_normalized = torch.tanh(x)
        
        # Evaluate B-spline basis
        basis_values = self._evaluate_bspline_basis(x_normalized)
        
        # Apply coefficients: basis (b,i,k) √ó coeffs (i,o,k) ‚Üí output (b,o)
        output = torch.einsum('bik,iok->bo', basis_values, self.spline_coeffs)
        
        return output
    
    def _evaluate_bspline_basis(self, x):
        """
        Evaluate B-spline basis functions using Cox-de Boor recursion.
        
        Args:
            x: (batch_size, in_features) values in [-1, 1]
        
        Returns:
            (batch_size, in_features, num_basis)
        """
        batch_size, in_features = x.shape
        device = x.device
        
        # Initialize basis matrix
        basis = torch.zeros(batch_size, in_features, self.num_basis, device=device)
        
        # Clamp to grid range
        x_clamped = torch.clamp(x, self.grid_range[0], self.grid_range[1])
        
        # Simplified polynomial basis (for efficiency)
        # In production, implement proper Cox-de Boor recursion
        for k in range(self.num_basis):
            # Polynomial powers: x^0, x^1, x^2, ...
            basis[:, :, k] = x_clamped ** k
        
        # Normalize basis functions
        basis = F.normalize(basis, p=2, dim=2)
        
        return basis


# Test KAN Layer
print("Testing KAN Layer...")
test_kan = KANLayer(in_features=10, out_features=5, num_basis=8)
test_input = torch.randn(4, 10)
test_output = test_kan(test_input)
print(f"Input shape: {test_input.shape} ‚Üí Output shape: {test_output.shape}")
print("‚úÖ KAN Layer working!")

In [None]:
class ConfusionDetectorKAN(nn.Module):
    """
    Complete Confusion Detection Model.
    
    Architecture:
        ViT Features (768) ‚Üí Optional LSTM ‚Üí KAN Layers ‚Üí Confusion Score (0-1)
    """
    
    def __init__(self, input_dim=768, hidden_dims=[256, 128, 64], 
                 use_lstm=True, lstm_hidden=256, dropout=0.3, 
                 num_basis=8, spline_order=3):
        super().__init__()
        
        self.use_lstm = use_lstm
        self.input_dim = input_dim
        
        # Optional LSTM for temporal modeling
        if use_lstm:
            self.lstm = nn.LSTM(
                input_size=input_dim,
                hidden_size=lstm_hidden,
                num_layers=2,
                batch_first=True,
                bidirectional=True,
                dropout=dropout
            )
            kan_input_dim = lstm_hidden * 2  # Bidirectional
        else:
            kan_input_dim = input_dim
        
        # KAN layers
        self.kan_layers = nn.ModuleList()
        
        prev_dim = kan_input_dim
        for hidden_dim in hidden_dims:
            self.kan_layers.append(
                KANLayer(prev_dim, hidden_dim, num_basis, spline_order)
            )
            prev_dim = hidden_dim
        
        # Final output layer
        self.kan_output = KANLayer(prev_dim, 1, num_basis, spline_order)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
        # Output activation
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        """
        Forward pass.
        
        Args:
            x: Input tensor
               - If use_lstm=True: (batch, seq_len, 768)
               - If use_lstm=False: (batch, 768)
        
        Returns:
            confusion_score: (batch, 1) in [0, 1]
        """
        # LSTM encoding
        if self.use_lstm:
            lstm_out, _ = self.lstm(x)  # (batch, seq_len, 512)
            x = lstm_out[:, -1, :]  # Take last timestep (batch, 512)
        
        # KAN layers
        for kan_layer in self.kan_layers:
            x = kan_layer(x)
            x = self.dropout(x)
        
        # Output
        x = self.kan_output(x)
        confusion_score = self.sigmoid(x)
        
        return confusion_score


# Test full model
print("\nTesting Complete Confusion Detector...")
kan_model = ConfusionDetectorKAN(
    input_dim=768,
    hidden_dims=[256, 128, 64],
    use_lstm=True
).to(device)

# Test with temporal sequence
test_seq = torch.randn(4, 30, 768).to(device)  # (batch=4, seq_len=30, features=768)
test_pred = kan_model(test_seq)
print(f"Input: {test_seq.shape} ‚Üí Output: {test_pred.shape}")
print(f"Prediction range: [{test_pred.min():.3f}, {test_pred.max():.3f}]")
print("‚úÖ Full KAN model working!")

# Model summary
total_params = sum(p.numel() for p in kan_model.parameters())
trainable_params = sum(p.numel() for p in kan_model.parameters() if p.requires_grad)
print(f"\nModel Parameters:")
print(f"  Total: {total_params:,}")
print(f"  Trainable: {trainable_params:,}")

## ‚è±Ô∏è **5. Component 4: Temporal Window Aggregator**

In [None]:
from collections import deque

class TemporalWindowAggregator:
    """
    Aggregate ViT features into temporal windows for KAN processing.
    
    Creates sliding windows of features (e.g., 3 seconds = 90 frames at 30fps)
    """
    
    def __init__(self, window_size=3.0, overlap=0.5, fps=30, feature_dim=768):
        self.window_size = window_size
        self.overlap = overlap
        self.fps = fps
        self.feature_dim = feature_dim
        
        # Window parameters
        self.frames_per_window = int(window_size * fps)
        self.stride = int(self.frames_per_window * (1 - overlap))
        
        # Buffer
        self.feature_buffer = deque(maxlen=int(self.frames_per_window * 2))
    
    def add_frame(self, timestamp, features):
        """Add features for a single frame."""
        self.feature_buffer.append({
            'timestamp': timestamp,
            'features': features
        })
    
    def get_windows(self):
        """
        Extract all complete temporal windows.
        
        Returns:
            List of window dicts with features and timestamps
        """
        windows = []
        buffer_list = list(self.feature_buffer)
        
        for i in range(0, len(buffer_list) - self.frames_per_window + 1, self.stride):
            window_frames = buffer_list[i : i + self.frames_per_window]
            
            features_array = np.stack([f['features'] for f in window_frames])
            
            windows.append({
                'start_time': window_frames[0]['timestamp'],
                'end_time': window_frames[-1]['timestamp'],
                'mid_time': (window_frames[0]['timestamp'] + window_frames[-1]['timestamp']) // 2,
                'features': features_array,  # (num_frames, 768)
                'num_frames': len(window_frames)
            })
        
        return windows


# Test aggregator
print("Testing Temporal Aggregator...")
aggregator = TemporalWindowAggregator(window_size=3.0, overlap=0.5, fps=30)

# Simulate adding frames
for i in range(100):
    timestamp = i * 33  # ~30 fps (33ms per frame)
    features = np.random.randn(768)
    aggregator.add_frame(timestamp, features)

windows = aggregator.get_windows()
print(f"Generated {len(windows)} windows")
print(f"First window: {windows[0]['start_time']}ms - {windows[0]['end_time']}ms")
print(f"Window features shape: {windows[0]['features'].shape}")
print("‚úÖ Temporal aggregator working!")

## üîó **6. Complete End-to-End Pipeline**

In [None]:
class ConfusionDetectionPipeline:
    """
    Complete end-to-end confusion detection pipeline.
    
    Usage:
        pipeline = ConfusionDetectionPipeline()
        for frame, timestamp in video:
            confusion_score = pipeline.process_frame(frame, timestamp)
    """
    
    def __init__(self, yolo_detector, vit_extractor, kan_model, 
                 window_size=3.0, fps=30, device='cuda'):
        self.yolo = yolo_detector
        self.vit = vit_extractor
        self.kan = kan_model
        self.device = device
        
        # Temporal aggregator
        self.aggregator = TemporalWindowAggregator(
            window_size=window_size,
            fps=fps,
            feature_dim=vit_extractor.embedding_dim
        )
        
        # Results storage
        self.confusion_scores = []
        
        # Set KAN to eval mode
        self.kan.eval()
    
    def process_frame(self, frame, timestamp):
        """
        Process a single frame.
        
        Args:
            frame: Video frame (H, W, 3) BGR
            timestamp: Timestamp in milliseconds
        
        Returns:
            confusion_score: 0-1 value or None if no face
        """
        # Step 1: Detect face
        detections = self.yolo.detect_faces(frame, return_crops=True)
        
        if not detections:
            return None
        
        # Use largest face
        face_data = self.yolo.get_largest_face(detections)
        face_crop = face_data['face_crop']
        
        # Step 2: Extract ViT features
        features = self.vit.extract_features(face_crop)
        
        # Step 3: Add to temporal buffer
        self.aggregator.add_frame(timestamp, features)
        
        # Step 4: Get windows and predict
        windows = self.aggregator.get_windows()
        
        confusion_score = None
        
        if windows:
            # Get latest window
            latest_window = windows[-1]
            
            # Convert to tensor
            window_features = torch.tensor(
                latest_window['features'], 
                dtype=torch.float32
            ).unsqueeze(0).to(self.device)  # (1, seq_len, 768)
            
            # KAN inference
            with torch.no_grad():
                confusion_score = self.kan(window_features).item()
            
            # Store
            self.confusion_scores.append({
                'timestamp': latest_window['mid_time'],
                'score': confusion_score,
                'window_start': latest_window['start_time'],
                'window_end': latest_window['end_time']
            })
        
        return confusion_score
    
    def get_confusion_curve(self):
        """Get complete confusion curve (timestamps, scores)."""
        if not self.confusion_scores:
            return [], []
        
        timestamps = [cs['timestamp'] for cs in self.confusion_scores]
        scores = [cs['score'] for cs in self.confusion_scores]
        
        return timestamps, scores
    
    def detect_confusion_peaks(self, prominence=0.2, distance=10):
        """
        Detect peaks in confusion curve.
        
        Args:
            prominence: Minimum prominence of peaks
            distance: Minimum distance between peaks (in data points)
        
        Returns:
            List of peak dicts with timestamps and scores
        """
        timestamps, scores = self.get_confusion_curve()
        
        if len(scores) < 3:
            return []
        
        # Find peaks
        peaks, properties = find_peaks(
            scores, 
            prominence=prominence, 
            distance=distance
        )
        
        peak_events = []
        for peak_idx in peaks:
            peak_events.append({
                'timestamp': timestamps[peak_idx],
                'score': scores[peak_idx],
                'index': peak_idx
            })
        
        return peak_events


print("‚úÖ Complete pipeline ready!")

## üìä **7. Dataset Preparation & Training**

In [None]:
class VideoConfusionDataset(Dataset):
    """
    Dataset for training confusion detection.
    Extracts temporal sequences from videos.
    """
    
    def __init__(self, video_paths, labels, yolo_detector, vit_extractor, 
                 max_frames=30, frame_skip=5):
        self.video_paths = video_paths
        self.labels = labels
        self.yolo = yolo_detector
        self.vit = vit_extractor
        self.max_frames = max_frames
        self.frame_skip = frame_skip
    
    def __len__(self):
        return len(self.video_paths)
    
    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        
        # Extract sequence
        sequence = self._extract_sequence(video_path)
        
        return torch.tensor(sequence).float(), torch.tensor([label]).float()
    
    def _extract_sequence(self, video_path):
        """Extract ViT features from video."""
        cap = cv2.VideoCapture(video_path)
        embeddings = []
        frame_idx = 0
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            if frame_idx % self.frame_skip == 0:
                # Detect face
                detections = self.yolo.detect_faces(frame, return_crops=True)
                
                if detections:
                    face = self.yolo.get_largest_face(detections)['face_crop']
                    features = self.vit.extract_features(face)
                    embeddings.append(features)
            
            frame_idx += 1
            
            if len(embeddings) >= self.max_frames:
                break
        
        cap.release()
        
        # Handle empty or short sequences
        if len(embeddings) == 0:
            embeddings = [np.zeros(self.vit.embedding_dim)]
        
        # Pad to max_frames
        sequence = np.array(embeddings)
        if len(sequence) < self.max_frames:
            pad_size = self.max_frames - len(sequence)
            pad = np.zeros((pad_size, self.vit.embedding_dim))
            sequence = np.vstack([sequence, pad])
        
        return sequence


print("‚úÖ Dataset class ready!")

In [None]:
def train_confusion_detector(model, train_loader, val_loader, 
                             num_epochs=10, learning_rate=0.001, device='cuda'):
    """
    Train confusion detection model.
    
    Args:
        model: ConfusionDetectorKAN
        train_loader: Training DataLoader
        val_loader: Validation DataLoader
        num_epochs: Number of training epochs
        learning_rate: Learning rate
        device: Device to train on
    
    Returns:
        Trained model, training history
    """
    model = model.to(device)
    
    # Optimizer and loss
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.BCELoss()
    
    # Training history
    history = {
        'train_loss': [],
        'val_loss': [],
        'val_accuracy': []
    }
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0.0
        
        train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")
        for sequences, labels in train_bar:
            sequences = sequences.to(device)
            labels = labels.to(device)
            
            # Forward pass
            predictions = model(sequences)
            loss = criterion(predictions, labels)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            train_bar.set_postfix({'loss': loss.item()})
        
        train_loss /= len(train_loader)
        
        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            val_bar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]")
            for sequences, labels in val_bar:
                sequences = sequences.to(device)
                labels = labels.to(device)
                
                predictions = model(sequences)
                loss = criterion(predictions, labels)
                
                val_loss += loss.item()
                
                # Accuracy (threshold at 0.5)
                pred_labels = (predictions > 0.5).float()
                correct += (pred_labels == labels).sum().item()
                total += labels.size(0)
        
        val_loss /= len(val_loader)
        val_accuracy = correct / total
        
        # Update history
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)
        
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"  Train Loss: {train_loss:.4f}")
        print(f"  Val Loss: {val_loss:.4f}")
        print(f"  Val Accuracy: {val_accuracy:.4f}")
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_confusion_detector.pth')
            print("  ‚úÖ Best model saved!")
        
        print()
    
    return model, history


print("‚úÖ Training function ready!")

## üé¨ **8. Demo: Process Video & Generate Confusion Curve**

In [None]:
def process_video_demo(video_path, pipeline, output_path='confusion_curve.png'):
    """
    Process a video and generate confusion curve.
    
    Args:
        video_path: Path to video file
        pipeline: ConfusionDetectionPipeline instance
        output_path: Path to save confusion curve plot
    """
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Processing video: {video_path}")
    print(f"FPS: {fps}, Total frames: {total_frames}")
    
    frame_idx = 0
    
    with tqdm(total=total_frames, desc="Processing frames") as pbar:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            timestamp = int((frame_idx / fps) * 1000)  # milliseconds
            
            # Process frame
            confusion_score = pipeline.process_frame(frame, timestamp)
            
            frame_idx += 1
            pbar.update(1)
            
            if confusion_score is not None:
                pbar.set_postfix({'confusion': f"{confusion_score:.2f}"})
    
    cap.release()
    
    # Get confusion curve
    timestamps, scores = pipeline.get_confusion_curve()
    
    if not timestamps:
        print("No confusion data generated (no faces detected?)")
        return
    
    # Detect peaks
    peaks = pipeline.detect_confusion_peaks(prominence=0.2, distance=10)
    
    # Plot confusion curve
    plt.figure(figsize=(14, 6))
    
    # Convert timestamps to seconds
    time_sec = [t/1000 for t in timestamps]
    
    # Plot curve
    plt.plot(time_sec, scores, 'b-', linewidth=2, label='Confusion Score')
    
    # Mark peaks
    if peaks:
        peak_times = [p['timestamp']/1000 for p in peaks]
        peak_scores = [p['score'] for p in peaks]
        plt.scatter(peak_times, peak_scores, color='red', s=100, 
                   marker='o', zorder=5, label='Confusion Peaks')
    
    # Threshold line
    plt.axhline(y=0.7, color='r', linestyle='--', alpha=0.5, label='High Confusion Threshold')
    
    plt.xlabel('Time (seconds)', fontsize=12)
    plt.ylabel('Confusion Score', fontsize=12)
    plt.title('EduSense Confusion Detection Curve', fontsize=14, fontweight='bold')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.ylim([0, 1])
    plt.tight_layout()
    
    # Save
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    print(f"\n‚úÖ Confusion curve saved to: {output_path}")
    
    # Statistics
    print(f"\nConfusion Statistics:")
    print(f"  Total windows: {len(scores)}")
    print(f"  Average confusion: {np.mean(scores):.3f}")
    print(f"  Max confusion: {np.max(scores):.3f} at {time_sec[np.argmax(scores)]:.1f}s")
    print(f"  Detected peaks: {len(peaks)}")
    
    if peaks:
        print(f"\n  Peak timestamps (seconds):")
        for i, peak in enumerate(peaks[:5], 1):
            print(f"    {i}. {peak['timestamp']/1000:.1f}s (score: {peak['score']:.3f})")
    
    plt.show()
    
    return timestamps, scores, peaks


print("‚úÖ Demo function ready!")

## üöÄ **9. Initialize Complete Pipeline**

In [None]:
# Initialize all components
print("Initializing EduSense Complete Pipeline...\n")

# Initialize pipeline
edusense_pipeline = ConfusionDetectionPipeline(
    yolo_detector=yolo_detector,
    vit_extractor=vit_extractor,
    kan_model=kan_model,
    window_size=3.0,
    fps=30,
    device=device
)

print("\n‚úÖ EduSense Pipeline Ready!")
print("\nComponents:")
print("  ‚úì YOLOv8 Face Detector")
print("  ‚úì Vision Transformer (768-dim embeddings)")
print("  ‚úì KAN Confusion Classifier")
print("  ‚úì Temporal Window Aggregator (3-second windows)")
print("\nPipeline: Frame ‚Üí YOLO ‚Üí ViT ‚Üí Windows ‚Üí KAN ‚Üí Confusion Score")

## üìπ **10. Test on Sample Video**

Upload a video or use a dataset video to test the complete pipeline.

In [None]:
# Example: Process a video
# Replace 'path/to/video.mp4' with your actual video path

# If you have a video file:
# video_path = '/content/sample_lecture.mp4'
# results = process_video_demo(video_path, edusense_pipeline)

# For now, print instructions
print("To test the pipeline:")
print("1. Upload a video file to Colab")
print("2. Set video_path = '/content/your_video.mp4'")
print("3. Run: process_video_demo(video_path, edusense_pipeline)")
print("\nThe system will:")
print("  - Detect faces in each frame (YOLO)")
print("  - Extract features (ViT)")
print("  - Aggregate into 3-second windows")
print("  - Predict confusion scores (KAN)")
print("  - Generate confusion curve with peaks")
print("  - Save visualization")

## üíæ **11. Save & Export Models**

In [None]:
# Save trained KAN model
def save_edusense_model(kan_model, save_path='edusense_kan_model.pth'):
    torch.save({
        'model_state_dict': kan_model.state_dict(),
        'model_config': {
            'input_dim': kan_model.input_dim,
            'use_lstm': kan_model.use_lstm,
        }
    }, save_path)
    print(f"‚úÖ Model saved to: {save_path}")

# Load model
def load_edusense_model(load_path='edusense_kan_model.pth', device='cuda'):
    checkpoint = torch.load(load_path, map_location=device)
    
    model = ConfusionDetectorKAN(
        input_dim=checkpoint['model_config']['input_dim'],
        use_lstm=checkpoint['model_config']['use_lstm']
    ).to(device)
    
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    
    print(f"‚úÖ Model loaded from: {load_path}")
    return model

# Save current model
save_edusense_model(kan_model, 'edusense_kan_initial.pth')

## üìä **12. Visualizations & Interpretability**

In [None]:
def visualize_kan_splines(kan_model, layer_idx=0, num_plots=4):
    """
    Visualize learned spline functions in KAN layer.
    This shows interpretability of the model.
    """
    kan_layer = kan_model.kan_layers[layer_idx]
    
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    axes = axes.flatten()
    
    x_vals = np.linspace(-1, 1, 100)
    
    for plot_idx in range(num_plots):
        ax = axes[plot_idx]
        
        # Select random input-output pair
        in_idx = plot_idx % kan_layer.in_features
        out_idx = plot_idx % kan_layer.out_features
        
        # Get spline coefficients
        coeffs = kan_layer.spline_coeffs[in_idx, out_idx, :].detach().cpu().numpy()
        
        # Evaluate spline
        y_vals = []
        for x in x_vals:
            # Simplified polynomial evaluation
            y = sum(coeffs[k] * (x ** k) for k in range(len(coeffs)))
            y_vals.append(y)
        
        ax.plot(x_vals, y_vals, 'b-', linewidth=2)
        ax.set_title(f'Spline Function: Input {in_idx} ‚Üí Output {out_idx}', fontsize=10)
        ax.set_xlabel('Input Value')
        ax.set_ylabel('Activation')
        ax.grid(True, alpha=0.3)
    
    plt.suptitle(f'KAN Layer {layer_idx} - Learned Spline Activations', 
                 fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.savefig('kan_splines_visualization.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("‚úÖ KAN splines visualization saved!")

# Visualize
visualize_kan_splines(kan_model, layer_idx=0)

## üéì **Summary**

### **What We Built:**

1. **YOLOv8 Face Detector** - Real-time face detection (30+ fps)
2. **Vision Transformer** - Extract 768-dim semantic features from faces
3. **Kolmogorov-Arnold Network** - Interpretable confusion classification
4. **Temporal Window Aggregator** - Create 3-second sliding windows
5. **Complete Pipeline** - End-to-end confusion detection system

### **Architecture:**
```
Video Frame (1280x720)
        ‚Üì
    [YOLOv8]  ‚Üê Face Detection
        ‚Üì
  Face Crop (224x224)
        ‚Üì
    [ViT]  ‚Üê Feature Extraction (768-dim)
        ‚Üì
  Temporal Windows (90 frames √ó 768 features)
        ‚Üì
    [LSTM + KAN]  ‚Üê Confusion Classification
        ‚Üì
  Confusion Score (0-1)
```

### **Key Features:**
- ‚úÖ Real-time processing capable
- ‚úÖ Interpretable predictions (KAN splines)
- ‚úÖ Temporal modeling (LSTM + sliding windows)
- ‚úÖ Peak detection for confusion hotspots
- ‚úÖ Visualization & analysis tools

### **Next Steps:**
1. Train on labeled confusion dataset (DAiSEE or custom)
2. Optimize hyperparameters
3. Deploy in Individual Study Mode
4. Integrate with RAG pipeline for adaptive remediation
5. Build instructor dashboard for Class Analytics Mode

---

**King Khalid University - College of Computer Science**  
**EduSense Graduation Project 2025**