In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import DataLoader
from dataset import FaceDataset
from sklearn.metrics import classification_report
import time

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import DataLoader
from dataset import FaceDataset
from sklearn.metrics import classification_report
import time


# Paths
train_path = "../data/train"
test_path = "../data/test"


# Transformations
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip images horizontally
    transforms.RandomRotation(degrees=15),   # Rotate images by up to 15 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Color adjustments
    transforms.RandomResizedCrop(size=(128, 128), scale=(0.8, 1.0)),  # Random cropping and resizing
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize the image
])

test_transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Datasets and Dataloaders
train_dataset = FaceDataset(train_path, transform=train_transform)
test_dataset = FaceDataset(test_path, transform=test_transform)

batch_size = 32 * 12
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Load pretrained first and second stage models
first_stage_model = models.resnet50(weights=True)
first_stage_model.fc = nn.Linear(first_stage_model.fc.in_features, 3)
first_stage_model.load_state_dict(torch.load("models/best_three_class.pt"))
second_stage_model = models.resnet50(weights=True)
second_stage_model.fc = nn.Linear(second_stage_model.fc.in_features, 2)
second_stage_model.load_state_dict(torch.load("models/best_two_class.pt"))

# Freeze layers before FC layer in both models
for param in first_stage_model.parameters():
    param.requires_grad = False
for param in second_stage_model.parameters():
    param.requires_grad = False

# Remove the FC layers to use the feature embeddings
first_stage_model = nn.Sequential(*list(first_stage_model.children())[:-1])  # Remove FC layer
second_stage_model = nn.Sequential(*list(second_stage_model.children())[:-1])  # Remove FC layer

# Define the combined model
class CombinedModel(nn.Module):
    def __init__(self, first_stage_model, second_stage_model):
        super(CombinedModel, self).__init__()
        self.first_stage_model = first_stage_model
        self.second_stage_model = second_stage_model

        self.fc = nn.Sequential(
            nn.Linear(2048 + 2048, 1024),  # Reduce feature dimensionality
            nn.ReLU(),  # Non-linear activation
            nn.Dropout(0.5),  # Regularization
            nn.Linear(1024, 256),  # Further reduction
            nn.ReLU(),
            nn.Linear(256, 4)  # Final output layer for 4 classes
        )

    def forward(self, x):
        first_features = self.first_stage_model(x)
        first_features = first_features.view(first_features.size(0), -1)  # Flatten
        second_features = self.second_stage_model(x)
        second_features = second_features.view(second_features.size(0), -1)  # Flatten

        combined_features = torch.cat((first_features, second_features), dim=1)
        output = self.fc(combined_features)
        return output

# Initialize the combined model
combined_model = CombinedModel(first_stage_model, second_stage_model)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
combined_model = combined_model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss(weight=torch.tensor([0.5, 1.5, 2.0, 0.5]).to(device), ignore_index=-1)
optimizer = torch.optim.Adam(combined_model.fc.parameters(), lr=0.0001, weight_decay=1e-4)  # Only train new FC layer





  first_stage_model.load_state_dict(torch.load("models/best_first.pt"))
  second_stage_model.load_state_dict(torch.load("models/best_second.pt"))


In [4]:
# Training function
def train_and_evaluate_model(model, train_loader, test_loader, criterion, optimizer, device, num_epochs=100):
    best_accuracy = 0
    for epoch in range(num_epochs):
        start_time = time.time()
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            # print(outputs, labels)

            # Compute loss
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = 100 * correct_train / total_train

        # Evaluate the model
        model.eval()
        all_labels = []
        all_predictions = []
        correct = 0
        total = 0
        val_running_loss = 0.0

        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)

                # Forward pass
                outputs = model(images)
                loss = criterion(outputs, labels)

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                val_running_loss += loss.item()
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predicted.cpu().numpy())

        accuracy = 100 * correct / total

        if best_accuracy < accuracy:
            torch.save(model.state_dict(), "models/best_combined.pt")
            best_accuracy = accuracy

        report = classification_report(all_labels, all_predictions,
                                       target_names=[f"Class {i}" for i in range(4)], zero_division=0)
        epoch_time = time.time() - start_time
        print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {running_loss / len(train_loader):.4f}, "
              f"Training Accuracy: {train_accuracy:.2f}%, Val Loss: {val_running_loss / len(test_loader):.4f}, "
              f"Test Accuracy: {accuracy:.2f}%, Time: {epoch_time:.2f} seconds")
        print(f"Test Classification Report:\n{report}")


In [5]:
train_and_evaluate_model(combined_model, train_loader, test_loader, criterion, optimizer, device, num_epochs=100)

Epoch [1/100], Training Loss: 0.4946, Training Accuracy: 77.46%, Val Loss: 0.9500, Test Accuracy: 77.17%, Time: 84.44 seconds
Test Classification Report:
              precision    recall  f1-score   support

     Class 0       1.00      0.83      0.91       124
     Class 1       0.66      0.77      0.71       123
     Class 2       0.61      0.69      0.65       121
     Class 3       0.91      0.79      0.84       127

    accuracy                           0.77       495
   macro avg       0.79      0.77      0.78       495
weighted avg       0.80      0.77      0.78       495



KeyboardInterrupt: 