In [1]:
import os

# Set the working directory
working_directory = "/Users/saeah/.cache/kagglehub/datasets/meowmeowmeowmeowmeow/gtsrb-german-traffic-sign/versions/1"
os.chdir(working_directory)

In [2]:
import os
print(os.getcwd())

/Users/saeah/.cache/kagglehub/datasets/meowmeowmeowmeowmeow/gtsrb-german-traffic-sign/versions/1


In [3]:
import torch

# Check if MPS is available
if torch.backends.mps.is_available():
    print(f"MPS is available. Using GPU.")
else:
    print("MPS is not available. Using CPU.")

MPS is available. Using GPU.


In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

# Ensure reproducibility
RANDOM_SEED = 1
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

# Hyperparameters
batch_size = 128
num_classes = 43  # Number of traffic sign classes
epochs = 40
learning_rate = 0.001
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

### DATA AUGMENTATION
# Transformations for training (with augmentation)
train_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),  # Small rotations and translations
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Color variations
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.2), ratio=(0.3, 3.3), value=0),  # Simulating occlusions
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Transformations for testing (NO augmentations, only normalization)
test_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Custom Dataset class for GTSRB
class GTSRBDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file)

        # Reorder columns: Move 'ClassId' and 'Path' to the first two positions
        columns = self.data.columns.tolist()
        new_order = ['ClassId', 'Path'] + [col for col in columns if col not in ['ClassId', 'Path']]
        self.data = self.data[new_order]

        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 1]  # Use path directly from CSV
        image = Image.open(img_path).convert("RGB")
        label = int(self.data.iloc[idx, 0])

        if self.transform:
            image = self.transform(image)

        return image, label

# Load datasets
train_dataset = GTSRBDataset(csv_file="Obscured_Train.csv", transform=train_transform)
test_dataset = GTSRBDataset(csv_file="Obscured_Test.csv", transform=test_transform)





# BALANCE THE DATASET

from collections import Counter
from torch.utils.data import WeightedRandomSampler

# Get class distribution
class_counts = train_dataset.data['ClassId'].value_counts().to_dict()
max_samples = max(class_counts.values())  # Maximum images in a single class

# Create a new balanced dataset by oversampling underrepresented classes
balanced_data = []

for class_id, count in class_counts.items():
    class_samples = train_dataset.data[train_dataset.data['ClassId'] == class_id]
    
    # If the class has fewer samples, randomly duplicate its entries
    if count < max_samples:
        extra_samples = class_samples.sample(n=max_samples - count, replace=True, random_state=RANDOM_SEED)
        class_samples = pd.concat([class_samples, extra_samples])
    
    balanced_data.append(class_samples)

# Concatenate all balanced data
balanced_df = pd.concat(balanced_data).reset_index(drop=True)

# Update the dataset
train_dataset.data = balanced_df

# Create new DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)









# train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# Fully Connected Neural Network Model with 2 layers
class TwoFCLayers(nn.Module):
    def __init__(self):
        super(TwoFCLayers, self).__init__()
        self.fc1 = nn.Linear(32*32*3, 1024)  # Flatten 32x32x3 image to 1024 hidden units
        # self.dropout1 = nn.Dropout(0.3)  # Dropout with 50% probability
        self.fc2 = nn.Linear(1024, 1024)     # Hidden layer
        # self.dropout2 = nn.Dropout(0.3)  # Dropout with 50% probability
        self.fc3 = nn.Linear(1024, num_classes)  # Output layer

    def forward(self, x):
        x = x.view(-1, 32*32*3)  # Flatten the input (batch_size, 32x32x3)
        x = torch.relu(self.fc1(x))  # Apply ReLU activation function
        # x = self.dropout1(x)
        x = torch.relu(self.fc2(x))  # Apply ReLU activation function
        # x = self.dropout2(x)
        x = self.fc3(x)  # Output layer (no activation)
        return x

# Initialize model
model = TwoFCLayers().to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=learning_rate)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.1)

# Lists to store loss and accuracy per epoch
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []

# Training and evaluation function
def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, epochs, device):

    
    for epoch in range(epochs):
        model.train()
        correct_train = 0
        total_train = 0
        train_loss = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track accuracy
            _, predicted = outputs.max(1)
            correct_train += (predicted == labels).sum().item()
            total_train += labels.size(0)
            train_loss += loss.item()

        train_accuracy = correct_train / total_train * 100
        train_losses.append(train_loss / len(train_loader))  # Average train loss

        # Validation/Test phase
        model.eval()
        correct_test = 0
        total_test = 0
        test_loss = 0

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                # Track accuracy
                _, predicted = outputs.max(1)
                correct_test += (predicted == labels).sum().item()
                total_test += labels.size(0)
                test_loss += loss.item()

        test_accuracy = correct_test / total_test * 100
        test_losses.append(test_loss / len(test_loader))  # Average test loss

        # Store values for plotting
        train_accuracies.append(train_accuracy)
        test_accuracies.append(test_accuracy)

        # print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}% | Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss / len(train_loader):.4f}, Train Accuracy: {train_accuracy:.2f}% | Test Loss: {test_loss / len(test_loader):.4f}, Test Accuracy: {test_accuracy:.2f}%")

    print(f"Final Train Accuracy: {train_accuracy:.2f}% | Final Test Accuracy: {test_accuracy:.2f}%")
    return model, train_losses, test_losses, train_accuracies, test_accuracies

# Train the model
model, train_losses, test_losses, train_accuracies, test_accuracies = train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, epochs, device)

# Save the trained model
model_path = "saved_models/gtsrb_model_fcnn_obscured_train.pth"
os.makedirs("saved_models", exist_ok=True)
torch.save(model.state_dict(), model_path)
print(f"Model saved at {model_path}")

In [None]:
import matplotlib.pyplot as plt
# Plot accuracy and loss curves
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs+1), train_accuracies, label='Train Accuracy')
plt.plot(range(1, epochs+1), test_accuracies, label='Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.title('Training and Testing Accuracy')

plt.subplot(1, 2, 2)
plt.plot(range(1, epochs+1), train_losses, label='Train Loss')
plt.plot(range(1, epochs+1), test_losses, label='Test Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Testing Loss')

plt.show()


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Collect true and predicted labels for the test dataset
true_labels = []
predicted_labels = []

model.eval()  # Ensure the model is in evaluation mode
with torch.no_grad():  # Disable gradient calculations
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Get model predictions
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        # Append labels
        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())

# Generate confusion matrix
cm = confusion_matrix(true_labels, predicted_labels, labels=list(range(num_classes)))

# Set a custom figure size
fig, ax = plt.subplots(figsize=(20, 20))  # Adjust the size as needed

# Display the confusion matrix with explicit axes
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(range(num_classes)))
disp.plot(cmap=plt.cm.Blues, ax=ax, xticks_rotation='vertical')  # Pass the custom axes

# Add title and show the plot
plt.title("Confusion Matrix - FCNN Model (Obscured Image Data)")
plt.show()