In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights
from torch.utils.data import DataLoader, Dataset # Ensure Dataset is imported
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
import numpy as np

# Parameters
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 50
DATASET_PATH = r"C:\Users\priya\OneDrive\Desktop\Student Attentiveness dataset"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)

# Data transforms (NO data augmentation)
data_transforms = {
    "train": transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        # Add your desired data augmentation here for training
        # transforms.RandomHorizontalFlip(),
        # transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    "val": transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# --- Robust Data Loading with separate transforms for subsets ---
class DatasetWithTransform(Dataset):
    """
    Wrapper to apply different transforms to subsets created by random_split.
    """
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        # Get item from the subset's original dataset using the subset's internal indexing
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)


# --- Data Loading ---
if not os.path.exists(DATASET_PATH):
    raise FileNotFoundError(f"Dataset path not found: {DATASET_PATH}")

train_dataset_full = datasets.ImageFolder(os.path.join(DATASET_PATH)) # No transform here initially
# The transform will be applied by the wrapper DatasetWithTransform later

val_split = 0.2
train_size = int((1 - val_split) * len(train_dataset_full))
val_size = len(train_dataset_full) - train_size
train_subset, val_subset = torch.utils.data.random_split(train_dataset_full, [train_size, val_size])

# Now wrap the subsets with their specific transforms
train_dataset = DatasetWithTransform(train_subset, data_transforms['train'])
val_dataset = DatasetWithTransform(val_subset, data_transforms['val'])


train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")


# --- Load pretrained EfficientNet B4 Model ---
weights = EfficientNet_B4_Weights.DEFAULT
model = efficientnet_b4(weights=weights)

# Freeze feature extractor layers
for param in model.features.parameters():
    param.requires_grad = False

num_ftrs = model.classifier[-1].in_features

model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, 4096),
    nn.ReLU(True),
    nn.Dropout(0.5),
    nn.Linear(4096, 4096),
    nn.ReLU(True),
    nn.Dropout(0.5),
    nn.Linear(4096, 1),
    nn.Sigmoid()
)
model = model.to(DEVICE)

# Loss and optimizer
criterion = nn.BCELoss() # Use BCELoss with Sigmoid output
# Optimize only the classifier parameters as feature extractor is frozen
optimizer = optim.Adam(model.classifier.parameters(), lr=1e-4)

# Training loop
for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Train]"):
        images, labels = images.to(DEVICE), labels.float().to(DEVICE).unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        preds = outputs.round()
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    train_acc = correct / total
    print(f"Train Loss: {running_loss/len(train_loader):.4f}, Accuracy: {train_acc:.4f}")

    # Validation
    model.eval()
    val_loss = 0.0
    all_val_preds = []
    all_val_labels = []
    all_val_probs = [] # Raw probabilities before rounding

    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Val]"):
            images, labels = images.to(DEVICE), labels.float().to(DEVICE).unsqueeze(1)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = outputs.round()

            all_val_preds.append(preds.cpu().numpy())
            all_val_labels.append(labels.cpu().numpy())
            all_val_probs.append(outputs.cpu().numpy())


    # Concatenate all collected results after the loop for epoch-level metrics
    all_val_preds = np.concatenate(all_val_preds).flatten()
    all_val_labels = np.concatenate(all_val_labels).flatten()
    all_val_probs = np.concatenate(all_val_probs).flatten()

    # Calculate additional metrics
    val_acc = np.mean(all_val_preds == all_val_labels)
    val_precision = precision_score(all_val_labels, all_val_preds, zero_division=0) # Added zero_division
    val_recall = recall_score(all_val_labels, all_val_preds, zero_division=0)     # Added zero_division
    val_f1 = f1_score(all_val_labels, all_val_preds, zero_division=0)             # Added zero_division

    try:
        val_roc_auc = roc_auc_score(all_val_labels, all_val_probs)
    except ValueError:
        val_roc_auc = float('nan') # Not a Number
        print("Warning: ROC AUC cannot be calculated (only one class present in validation labels).")


    print(f"Val Loss: {val_loss/len(val_loader):.4f}, Accuracy: {val_acc:.4f}")
    print(f"Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1-Score: {val_f1:.4f}, Val ROC AUC: {val_roc_auc:.4f}")


# Save model
torch.save(model.state_dict(), "eye_attention_model_efficientnet_b4_no_aug.pth")
print("Training complete. Model saved as eye_attention_model_efficientnet_b4_no_aug.pth")

In [None]:
import cv2
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models import efficientnet_b4, EfficientNet_B4_Weights
import os

# --- Configuration (MUST MATCH TRAINING CONFIG) ---
IMG_SIZE = 224
MODEL_PATH = "eye_attention_model_efficientnet_b4_no_aug.pth"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_CLASSES = 1 # Binary classification output

print(f"Using device: {DEVICE}")

# --- Model Definition (MUST MATCH TRAINING MODEL ARCHITECTURE) ---
def create_efficientnet_b4_model(num_classes=1, pretrained=True, freeze_features=False, dropout_rate=0.5):
    """
    Re-defines the EfficientNet B4 model exactly as it was defined during training.
    Important: `freeze_features` should be False if you loaded a fully fine-tuned model.
    `dropout_rate` should match the best dropout rate found during training.
    """
    if pretrained:
        weights = EfficientNet_B4_Weights.DEFAULT
        model = efficientnet_b4(weights=weights)
    else:
        model = efficientnet_b4()

    # Freeze features if the model was trained in a two-stage process where features were frozen initially
    # For inference, if you loaded a fully fine-tuned model, you don't need to freeze.
    # If you only trained the head, then you would freeze.
    if freeze_features: # This should typically be False for loading a fully trained model
        for param in model.features.parameters():
            param.requires_grad = False

    num_ftrs = model.classifier[-1].in_features

    # Recreate the exact same custom classifier head
    model.classifier = nn.Sequential(
        nn.Linear(num_ftrs, 4096),
        nn.ReLU(True),
        nn.Dropout(dropout_rate), # Use the dropout rate from training
        nn.Linear(4096, 4096),
        nn.ReLU(True),
        nn.Dropout(dropout_rate), # Use the dropout rate from training
        nn.Linear(4096, num_classes),
        nn.Sigmoid() # Sigmoid is part of the model output for BCELoss
    )
    return model.to(DEVICE)

# --- Load the Trained Model ---
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Trained model not found at: {MODEL_PATH}. Please ensure it's in the correct path.")

model = create_efficientnet_b4_model(num_classes=NUM_CLASSES, pretrained=True, freeze_features=False, dropout_rate=0.5) # Set freeze_features to False if loading a fully fine-tuned model
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model.eval() # Set model to evaluation mode

print(f"Model loaded from {MODEL_PATH} and set to evaluation mode.")

# --- Preprocessing Transforms for Inference (MUST MATCH Validation Transforms) ---
preprocess_transforms = transforms.Compose([
    transforms.ToPILImage(), # Convert OpenCV BGR image (numpy array) to PIL Image
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# --- Webcam Capture and Inference Loop ---
def run_webcam_inference():
    cap = cv2.VideoCapture(0) # 0 for default webcam

    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return

    print("Webcam opened. Press 'q' to quit.")

    while True:
        ret, frame = cap.read() # Read a frame from the webcam
        if not ret:
            print("Failed to grab frame.")
            break

        # Convert the frame from BGR (OpenCV default) to RGB (PyTorch/PIL expects RGB)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Preprocess the frame for the model
        input_tensor = preprocess_transforms(frame_rgb).unsqueeze(0).to(DEVICE) # Add batch dimension

        # Make prediction
        with torch.no_grad():
            output = model(input_tensor)
            probability = torch.sigmoid(output).item() # Get the probability
            prediction = 1 if probability >= 0.5 else 0 # Threshold to get class label

        # Display prediction on the frame
        label_text = "Attentive" if prediction == 1 else "Not Attentive"
        color = (0, 255, 0) if prediction == 1 else (0, 0, 255) # Green for attentive, Red for not attentive

        cv2.putText(frame, f"Status: {label_text} ({probability:.2f})", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)

        # Display the frame
        cv2.imshow('Gazenet Attention Detector', frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release() # Release the webcam
    cv2.destroyAllWindows() # Close all OpenCV windows

if __name__ == "__main__":
    run_webcam_inference()