In [None]:
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader, random_split

class CarDataset(Dataset):
    def __init__(self, annotations_df, root_dir, transform=None):
        self.annotations = annotations_df.reset_index(drop=True)
        self.root_dir = root_dir
        self.transform = transform
        
        self.annotations['Class'] = self.annotations['Class'] - 1
    
    def __len__(self):
        return len(self.annotations)
    
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.annotations.iloc[idx]['image'])
        image = Image.open(img_name).convert('RGB')
        label = int(self.annotations.iloc[idx]['Class'])
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(label, dtype=torch.long)
        

In [None]:
import pandas as pd
file_path = "car_boundingBox_class_and_image_number.csv"
car_dataset_df = pd.read_csv(file_path)
num_classes = car_dataset_df['Class'].nunique()

num_classes

In [None]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((480, 480), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.RandomApply(
        [transforms.RandomChoice([
            transforms.RandomRotation(degrees=(-90, -90)),
            transforms.RandomRotation(degrees=(90, 90))
        ])],
        p=0.5
    ),
    transforms.RandomHorizontalFlip(p=0.5), 
    transforms.ToTensor(),
])

transform_validation = transforms.Compose([
    transforms.Resize((480, 480), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.ToTensor(),
])

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR
import torch.nn as nn
import torch.optim as optim
from PIL import Image
import os
import random
from sklearn.model_selection import train_test_split

In [None]:
train_df, val_df = train_test_split(car_dataset_df, test_size=0.2, random_state=22)

train_ds = CarDataset(annotations_df=train_df, root_dir="cars_training_image/cars_train/", transform=transform)
val_ds = CarDataset(annotations_df=val_df, root_dir="cars_training_image/cars_train/", transform=transform_validation)

In [None]:
train_loader = DataLoader(train_ds, batch_size=6, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=6, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
for i, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    
    print(f"Batch {i+1}")
    print("Images shape:", images.shape)
    print("Labels:", labels)
    
    if i == 2:
        break

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

class EfficientNetV2LClassifier(nn.Module):
    def __init__(self, num_classes=196):
        super(EfficientNetV2LClassifier, self).__init__()
        
        weights = models.EfficientNet_V2_S_Weights.IMAGENET1K_V1
        self.model = models.efficientnet_v2_s(weights=weights)
        
        in_features = self.model.classifier[1].in_features  
        self.model.classifier = nn.Linear(in_features, num_classes)

        self.optimizer = optim.AdamW(self.model.parameters(), lr=1e-4, weight_decay=1e-4)

        self.criterion = nn.CrossEntropyLoss(label_smoothing = 0.1) 
    
    def forward(self, x):
        x = self.model(x)
        return x
        

In [None]:
model = EfficientNetV2LClassifier(num_classes=196).to(device)
optimizer = model.optimizer
criterion = model.criterion

In [None]:
import torch.nn.functional as F
from torch.amp import autocast

def fgsm_attack(model, criterion, images, labels, mean, std_dev):
    epsilon = torch.normal(mean=mean, std=std_dev, size=(1,)).item()
    adv_images = images.clone().detach().requires_grad_(True).to(images.device)
    with torch.autocast(device_type=images.device.type, enabled=(images.device.type == 'cuda')):
        outputs = model(adv_images)
        loss = criterion(outputs, labels)
    grad = torch.autograd.grad(loss, adv_images, retain_graph=False, create_graph=False, only_inputs=True)[0]
    adv_images = adv_images + epsilon * grad.sign()
    adv_images = torch.clamp(adv_images, 0, 1).detach()
    return adv_images

In [None]:
def show_comparison_plot(base_images, mixup_images, adversarial_images, num_display=5):

    num_images = min(num_display, len(base_images))
    fig, axes = plt.subplots(3, num_images, figsize=(num_images * 3, 9))  # 3 rows for base, mixup, adversarial

    for i in range(num_images):
        base = to_pil_image(base_images[i].cpu())
        mixup = to_pil_image(mixup_images[i].cpu())
        adversarial = to_pil_image(adversarial_images[i].cpu())

        axes[0, i].imshow(base)
        axes[0, i].set_title(f"Base {i + 1}")
        axes[0, i].axis('off')

        axes[1, i].imshow(mixup)
        axes[1, i].set_title(f"Mixup {i + 1}")
        axes[1, i].axis('off')

        axes[2, i].imshow(adversarial)
        axes[2, i].set_title(f"Adversarial {i + 1}")
        axes[2, i].axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
def mixup_data(x, y, alpha=0.2, lower=0.55, upper=0.85):

    if alpha > 0:
        # Sample lambda from Beta(alpha, alpha)
        lam = np.random.beta(alpha, alpha)
        
        # Clamp lambda to the desired range
        lam = max(lower, min(upper, lam))
    else:
        lam = 1.0

    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, preds, y_a, y_b, lam):
    return lam * criterion(preds, y_a) + (1 - lam) * criterion(preds, y_b)

In [None]:
import os
import time
import numpy as np
from datetime import timedelta
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision.transforms.functional import to_pil_image
import matplotlib.pyplot as plt

epochs = 33
best_val_loss = float('inf')
no_improve_epochs = 0
early_stop_patience = 3
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)
scheduler = CosineAnnealingLR(optimizer, T_max=epochs)

for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    epoch_start = time.monotonic()

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        batch_start = time.monotonic()

        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()

        preds_base = model(inputs)
        loss_base = criterion(preds_base, targets)

        inputs_mixed, targets_a, targets_b, lam = mixup_data(inputs, targets, alpha=0.2, lower=0.55, upper=0.85)

        preds_mixed = model(inputs_mixed)
        loss_mixed = mixup_criterion(criterion, preds_mixed, targets_a, targets_b, lam)

        adv_inputs = fgsm_attack(model, criterion, inputs, targets, mean=0.03, std_dev=0.01)

        preds_adv = model(adv_inputs)
        loss_adv = criterion(preds_adv, targets)

        total_loss = (loss_base + loss_mixed + loss_adv) / 3

        total_loss.backward()
        optimizer.step()

        total_train_loss += total_loss.item()

        batch_duration = timedelta(seconds=(time.monotonic() - batch_start))
        print(f"Epoch [{epoch+1}/{epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {total_loss.item():.4f}, Time: {batch_duration}")

        if batch_idx % 10 == 0:
            num_display = 5 

            actual_num_display = min(num_display, inputs.size(0))
            if actual_num_display > 0:
                indices = random.sample(range(inputs.size(0)), actual_num_display)
                selected_base = inputs[indices]
                selected_mixed = inputs_mixed[indices]
                selected_adv = adv_inputs[indices]
                show_comparison_plot(selected_base, selected_mixed, selected_adv, num_display=actual_num_display)

    scheduler.step()

    avg_train_loss = total_train_loss / len(train_loader)
    epoch_duration = timedelta(seconds=(time.monotonic() - epoch_start))
    print(f"Epoch [{epoch+1}/{epochs}] completed. Avg Loss: {avg_train_loss:.4f}. Time: {epoch_duration}")

    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            preds = model(inputs)
            val_loss = criterion(preds, targets)
            total_val_loss += val_loss.item()

    avg_val_loss = total_val_loss / len(val_loader)
    print(f"Epoch [{epoch+1}/{epochs}], Validation Loss: {avg_val_loss:.4f}")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        no_improve_epochs = 0
        best_model_path = os.path.join(save_dir, "early_stopping.pth")
        torch.save(model.state_dict(), best_model_path)
        print(f"Best model updated and saved to {best_model_path}")
    else:
        no_improve_epochs += 1
        print(f"No improvement for {no_improve_epochs} epoch(s).")
        if no_improve_epochs >= early_stop_patience:
            print("Early stopping due to no improvement.")
            break
            
    model_path = os.path.join(save_dir, f"model_epoch_{epoch+1}_adversarial.pth")
    torch.save(model.state_dict(), model_path)
    print(f"Model for epoch {epoch+1} saved as {model_path}.")

print("Training session finished.")