# Augmentation

In [None]:
import os
import torch
import numpy as np
from PIL import Image, ImageFilter, ImageEnhance
from torchvision import transforms
from sklearn.model_selection import train_test_split

def augment(n_images_per_class: int, 
            resolution: int, 
            seed: int,
            input_path = "raw_database",
            output_path = "augmented-db"):
    
    rng = np.random.default_rng(seed=seed)
    
    classes = os.listdir(input_path)
    if not os.path.exists(output_path):
            os.mkdir(output_path)
    for Class in classes:
        if not os.path.exists(os.path.join(output_path,Class)):
            os.mkdir(os.path.join(output_path,Class))
    
    n_total_images = len(classes)*n_images_per_class
    X = torch.zeros(n_total_images,3,resolution,resolution)
    Y = torch.zeros(n_total_images)
    transf = transforms.ToTensor()
    
    with open(os.path.join(output_path,"data.txt"),"w") as file:
        i = 0
        for label, Class in enumerate(classes):
            images = os.listdir(os.path.join(input_path,Class))
            n = len(images)
            m = n_images_per_class // n
            n_transf_per_image = [m for i in range(n - n_images_per_class % n)] + [m+1 for i in range(n_images_per_class % n)]
            rng.shuffle(n_transf_per_image)
            
            for image_name,n_transf in zip(images,n_transf_per_image):
                img = Image.open(os.path.join(input_path,Class,image_name)).convert("RGB")
                for j in range(n_transf):
                    new_img, data = produce_image(img, resolution, rng)
                    new_image_name = f"{image_name[:-5]}_{j}.jpeg" 
                    new_img.save(os.path.join(output_path,Class,new_image_name))
                    X[i] = transf(new_img)
                    Y[i] = label
                    i += 1 
                    
                    file.write(new_image_name+"\n")
                    for dat in data:
                        file.write(dat+"\n")
                    file.write("\n")
                    
                    if i % 100 == 0:
                        print(f"class {label+1}/{len(classes)}, total {i}/{n_total_images}")
        
    X_train, X_test, Y_train, Y_test = train_test_split(X,Y,train_size=0.8)
    torch.save(X_train,os.path.join(output_path,"X_train.pt"))
    torch.save(X_test,os.path.join(output_path,"X_test.pt"))
    torch.save(Y_train,os.path.join(output_path,"Y_train.pt"))
    torch.save(Y_test,os.path.join(output_path,"Y_test.pt"))
            
    
def produce_image(img, resolution, rng):
    transformation_list = [(rotate,True),
                           (crop,True),
                           (hor_flip,False),
                           (ver_flip,False),
                           (blur,False),
                           (brightness,False),
                           (contrast,False),
                           (color,False)]
    
    n_transformations = rng.choice([1,2,3,4],p=[0.2,0.4,0.3,0.1])
    chosen_transformations = rng.choice(list(range(len(transformation_list))),
                                              size=n_transformations,
                                              replace=False)
    
    data = []
    for i in chosen_transformations:
        if transformation_list[i][1]:
            img, dat = transformation_list[i][0](img, rng)
            data += [dat]
    
    img = transforms.functional.center_crop(img,min(img.size))    
    img = img.resize((resolution,resolution))
    
    for i in chosen_transformations:
        if not transformation_list[i][1]:
            img, dat = transformation_list[i][0](img, rng)
            data += [dat]
        
    return [img, data]


def hor_flip(img, rng):
    img = img.transpose(Image.FLIP_LEFT_RIGHT)
    return [img, "Horizontal Flip"]

def ver_flip(img, rng):
    img = img.transpose(Image.FLIP_TOP_BOTTOM)
    return [img, "Vertical Flip"]

def crop(img, rng):
    width, height = img.size
    scale = rng.uniform(0.5,0.75)
    maxOffset = (1-scale)*min(width,height)
    offsetX = rng.uniform(-1,1)
    offsetY = rng.uniform(-1,1)
    size = min(width,height)*scale
    img = img.crop((width/2 + offsetX*maxOffset/2 - size/2, height/2 + offsetY*maxOffset/2 - size/2, width/2 + offsetX*maxOffset/2 + size/2, height/2 + offsetY*maxOffset/2 + size/2))
    return [img, f"Crop: scale = {scale}, offsetX = {offsetX}, offsetY = {offsetY}"]

def rotate(img, rng):
    angle = rng.uniform(-45,45)
    img = img.rotate(angle,expand=False)
    return [img, f"Rotate: angle = {angle}"]

def blur(img, rng):
    value = rng.integers(1,3)
    img = img.filter(ImageFilter.GaussianBlur(radius = value))
    return [img, f"Gaussian Blur: value = {value}"]

def brightness(img, rng):
    value = rng.uniform(0.3,1.7)
    enhancer = ImageEnhance.Brightness(img)
    img = enhancer.enhance(value)
    return [img, f"Brightness: value = {value}"]

def contrast(img, rng):
    value = rng.uniform(0.3,1.7)
    enhancer = ImageEnhance.Contrast(img)
    img = enhancer.enhance(value)
    return [img, f"Constrast: value = {value}"]

def color(img, rng):
    value = rng.uniform(0.3,1.7)
    enhancer = ImageEnhance.Color(img)
    img = enhancer.enhance(value)
    return [img, f"Color: value = {value}"]

# model

In [None]:
from config import nn,np


class SimplifiedVGG(nn.Module):
    def __init__(self, num_classes=10, dropout_rate=0.3):
        super(SimplifiedVGG, self).__init__()
        self.features = nn.Sequential(
    
                nn.Conv2d(3, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
                nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2, stride=2),

                
                nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
                nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2, stride=2),

                
                nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
                nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
                nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2, stride=2),

                
                nn.Conv2d(256, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2, stride=2),

                
                nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2, stride=2),
            )


        self.classifier = nn.Sequential(
            nn.Linear(512 * 1 * 1, 2048), nn.BatchNorm1d(2048), nn.ReLU(inplace=True), nn.Dropout(0.2),
            nn.Linear(2048, 1024), nn.BatchNorm1d(1024), nn.ReLU(inplace=True), nn.Dropout(0.2),
            nn.Linear(1024, 512), nn.BatchNorm1d(512), nn.ReLU(inplace=True), nn.Dropout(0.2),
            nn.Linear(512, num_classes)
)



    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


# basic configs

In [None]:
NUM_CLASSES = 10
K_FOLDS = 3
PATIENCE = 7
NUM_EPOCHS = 50

# Hyperparameter tuning

In [None]:
import numpy as np
import os
from train import Trainer


def run_hyperparameter_tuning():
   
    os.makedirs(PARAMS_PATH, exist_ok=True)


    param_space = {
        "Adam": [
            {"learning_rate": 0.001, "batch_size": 64},
            {"learning_rate": 0.0005, "batch_size": 64},
            {"learning_rate": 0.0005, "batch_size": 128}
        ],
        "SGD": [
            {"learning_rate": 0.01, "batch_size": 32},
            {"learning_rate": 0.005, "batch_size": 64},
            {"learning_rate": 0.001, "batch_size": 128}
        ]
    }

    for opt_type, configs in param_space.items():
        best_acc = 0
        best_config = None

        for config in configs:
            print(f"\nTrying Hyperparameters: {config} with Optimizer: {opt_type}")
            trainer = Trainer(
                learning_rate=config["learning_rate"],
                batch_size=config["batch_size"],
                optimizer_type=opt_type
            )
            val_acc = trainer.train(return_best_val_acc=True)

            if val_acc > best_acc:
                best_acc = val_acc
                best_config = config

     
        result = {"best_acc": best_acc, "best_config": best_config}
        np.save(os.path.join(PARAMS_PATH, f"best_params_{opt_type}.npy"), result)

        print(f"\n Best Hyperparameters for {opt_type}: {best_config} with Accuracy: {best_acc:.2f}%")


# training

In [None]:
def mixup_data(x, y, alpha=0.2):
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size()[0]
    index = torch.randperm(batch_size)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


\begin{align*}
\widetilde{y}_i &= \lambda\,\mathbf{1}_{y_i} + (1 - \lambda)\,\mathbf{1}_{y_j}, \\[6pt]
\widetilde{y}_i &:\ \text{mixed (soft) label for sample }i,\\
\mathbf{1}_{y_i} &:\ \text{one-hot vector of length }C\text{ with a 1 at index }y_i,\\
\mathbf{1}_{y_j} &:\ \text{one-hot vector of length }C\text{ with a 1 at index }y_j,\\
y_i &:\ \text{ground-truth class label of sample }i,\\
y_j &:\ \text{ground-truth class label of the sample paired to }i,\\
\lambda &:\ \text{mixing coefficient sampled from }\mathrm{Beta}(\alpha,\alpha).
\end{align*}




1. Learns better patterns
2. Handles noise better
3. Improves generalization

In [None]:
from config import torch, np, optim, DEVICE, NUM_CLASSES, NUM_EPOCHS, K_FOLDS, PATIENCE, KFold, DataLoader, Subset, load_dataset
from model import SimplifiedVGG
import torchvision.transforms as transforms
import random
import os


SEED = 42


class Trainer:
    def __init__(self, learning_rate=0.001, batch_size=32, optimizer_type="Adam"):
        self.batch_size = batch_size
        self.device = DEVICE
        self.num_classes = NUM_CLASSES
        self.num_epochs = NUM_EPOCHS
        self.k_folds = K_FOLDS
        self.patience = PATIENCE
        self.learning_rate = learning_rate
        self.optimizer_type = optimizer_type
        self.train_loader, _ = load_dataset.load_dataset(batch_size=self.batch_size, path="/scratch/username")
        self.kf = KFold(n_splits=self.k_folds, shuffle=True, random_state=SEED)

    def get_optimizer(self, model):
        if self.optimizer_type == "Adam":
            optimizer = optim.AdamW(model.parameters(), lr=self.learning_rate, weight_decay=1e-4)
            scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
        elif self.optimizer_type == "SGD":
            optimizer = optim.SGD(model.parameters(), lr=self.learning_rate, momentum=0.9, weight_decay=5e-4, nesterov=True)
            scheduler = optim.lr_scheduler.OneCycleLR(
                optimizer,
                max_lr=self.learning_rate,
                steps_per_epoch=len(self.train_loader),
                epochs=self.num_epochs
            )
        else:
            raise ValueError("Unsupported optimizer type")
        return optimizer, scheduler

    def train(self, return_best_val_acc=False):
        fold_results = []

        for fold, (train_idx, val_idx) in enumerate(self.kf.split(range(len(self.train_loader.dataset)))):
            print(f"\nFold {fold+1}/{self.k_folds}")
            
            best_val_loss = np.inf
            epochs_no_improve = 0
            history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

            train_subset = Subset(self.train_loader.dataset, train_idx)
            val_subset = Subset(self.train_loader.dataset, val_idx)
            train_loader = DataLoader(train_subset, batch_size=self.batch_size, shuffle=True, num_workers=4)
            val_loader = DataLoader(val_subset, batch_size=self.batch_size, shuffle=False, num_workers=4)

            model = SimplifiedVGG().to(self.device)
            criterion = torch.nn.CrossEntropyLoss(label_smoothing=0.01)
            optimizer, scheduler = self.get_optimizer(model)

            for epoch in range(self.num_epochs):
                model.train()
                running_loss, correct_train, total_train = 0.0, 0, 0

                for images, labels in train_loader:
                    images, labels = images.to(self.device), labels.to(self.device).long()
                    optimizer.zero_grad()
                    images, labels_a, labels_b, lam = mixup_data(images, labels, alpha=0.2)
                    outputs = model(images)
                    loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
                    loss.backward()
                    optimizer.step()

                    running_loss += loss.item()
                    _, predicted = outputs.max(1)
                    correct_train += (predicted == labels).sum().item()
                    total_train += labels.size(0)

               
                model.eval()
                correct_val, total_val, val_loss = 0, 0, 0.0
                with torch.no_grad():
                    for images, labels in val_loader:
                        images, labels = images.to(self.device), labels.to(self.device).long()
                        outputs = model(images)
                        loss = criterion(outputs, labels)
                        val_loss += loss.item()
                        _, predicted = outputs.max(1)
                        correct_val += (predicted == labels).sum().item()
                        total_val += labels.size(0)

                val_accuracy = 100 * correct_val / total_val
                print(f"Epoch [{epoch+1}/{self.num_epochs}], Val Loss: {val_loss/len(val_loader):.4f}, Val Accuracy: {val_accuracy:.2f}%")

                epoch_train_loss = running_loss / len(train_loader)
                epoch_train_acc = 100 * correct_train / total_train
                epoch_val_loss = val_loss / len(val_loader)

                if epoch_val_loss < best_val_loss:
                    best_val_loss = epoch_val_loss
                    epochs_no_improve = 0
                    torch.save(model.state_dict(), f"best_model_{self.optimizer_type}.pth")
                    print(f"Val Loss Improved. Model saved at epoch {epoch+1}")
                else:
                    epochs_no_improve += 1
                    print(f"No improvement for {epochs_no_improve} epoch(s)")

                if epochs_no_improve >= self.patience:
                    print(f"\nEarly stopping at epoch {epoch+1} due to no improvement for {self.patience} epochs.")
                    break

                history["train_loss"].append(epoch_train_loss)
                history["val_loss"].append(epoch_val_loss)
                history["train_acc"].append(epoch_train_acc)
                history["val_acc"].append(val_accuracy)

                if scheduler:
                    scheduler.step()

            fold_results.append(val_accuracy)
            np.save(f"training_history_{self.optimizer_type}_fold_{fold}.npy", history)
            print(f"Training history saved for fold {fold}.")

        avg_val_acc = np.mean(fold_results)
        print(f"\n Average Validation Accuracy ({self.optimizer_type}): {avg_val_acc:.2f}%")
        print(f"Estimated Test Error: {100 - avg_val_acc:.2f}%")

        if return_best_val_acc:
            return max(fold_results)

# evaluaion

In [None]:
from config import torch, sns, plt, os, np, confusion_matrix, DEVICE, NUM_CLASSES, load_dataset, PLOT_PATH, EVAL_PATH
from model import SimplifiedVGG
from sklearn.metrics import classification_report, accuracy_score, roc_curve, auc, precision_recall_curve, f1_score
from collections import Counter
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json

def evaluate_model(model_name, visualize=True):
    model = SimplifiedVGG().to(DEVICE)
    model.load_state_dict(torch.load(f"best_model_{model_name}.pth"))
    model.eval()

    _, test_loader = load_dataset.load_dataset(batch_size=32, path="/scratch/username")

    all_preds, all_labels, all_probs = [], [], []
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE).long()
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            _, predicted = outputs.max(1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    print(f"\nEvaluation for Optimizer: {model_name}")
    print(f"Test Accuracy: {acc * 100:.2f}%")
    report = classification_report(all_labels, all_preds, digits=4)
    print("\nClassification Report:")
    print(report)

    os.makedirs(EVAL_PATH, exist_ok=True)
    with open(os.path.join(EVAL_PATH, f"classification_report_model_{model_name}.txt"), "w") as f:
        f.write(f"Test Accuracy: {acc * 100:.2f}%\n")
        f.write(report)

    if visualize:
        visualize_results(all_labels, all_preds, all_probs, test_loader, model_name, acc)

    return acc


def visualize_results(all_labels, all_preds, all_probs, test_loader, model_name, acc):
    os.makedirs(PLOT_PATH, exist_ok=True)
    classes = list(range(NUM_CLASSES))

    
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.title(f"Confusion Matrix - {model_name}")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    sns.despine()
    plt.tight_layout()
    plt.savefig(os.path.join(PLOT_PATH, f"confusion_matrix_{model_name}.png"))
    plt.close()




if __name__ == "__main__":
    results = {}

    for opt in ["Adam", "SGD"]:
        acc = evaluate_model(model_name=opt, visualize=True)
        results[opt] = acc * 100

    print("\nEvaluation Results:")
    for opt, acc in results.items():
        print(f"{opt}: {acc:.2f}% Test Accuracy")

    best = max(results, key=results.get)
    print(f"\nBest Optimizer: {best} with Accuracy: {results[best]:.2f}%")
 
    plt.figure(figsize=(8, 6))
    sns.barplot(x=list(results.keys()), y=list(results.values()), palette="viridis")
    plt.title("Test Accuracy Comparison")
    for i, v in enumerate(results.values()):
        plt.text(i, v + 1, f"{v:.2f}%", ha="center", fontsize=12)
    sns.despine()
    plt.tight_layout()
    plt.savefig(os.path.join(PLOT_PATH, "evaluation_results_chart.png"))
    plt.close()


    PLOT_PATH = "plots"
    os.makedirs(PLOT_PATH, exist_ok=True)

    optimizers = ["Adam", "SGD"]
    folds = [0, 1, 2]

    for opt in optimizers:
        all_acc = []
        all_loss = []

        for fold in folds:
            file = f"training_history_{opt}_fold_{fold}.npy"
            if not os.path.exists(file):
                print(f"Missing file: {file}")
                continue

            history = np.load(file, allow_pickle=True).item()
            acc = history["train_acc"] 
            loss = history["train_loss"]

            all_acc.append(acc)
            all_loss.append(loss)

            # Training Accuracy - per fold
            plt.figure(figsize=(10, 6))
            sns.lineplot(x=range(len(acc)), y=acc)
            plt.title(f"Training Accuracy - {opt} - Fold{fold}")
            plt.xlabel("Epochs")
            plt.ylabel("Accuracy (%)")
            plt.tight_layout()
            plt.savefig(os.path.join(PLOT_PATH, f"training_accuracy_{opt}_fold_{fold}.png"))
            plt.close()

            # Training Loss - per fold
            plt.figure(figsize=(10, 6))
            sns.lineplot(x=range(len(loss)), y=loss)
            plt.title(f"Training Loss - {opt} - Fold{fold}")
            plt.xlabel("Epochs")
            plt.ylabel("Loss")
            plt.tight_layout()
            plt.savefig(os.path.join(PLOT_PATH, f"training_loss_{opt}_fold_{fold}.png"))
            plt.close()

        if all_acc and all_loss:
            # Truncate to minimum length across folds
            min_len = min(len(a) for a in all_acc)
            all_acc = [a[:min_len] for a in all_acc]
            all_loss = [l[:min_len] for l in all_loss]

            avg_acc = np.mean(all_acc, axis=0)
            avg_loss = np.mean(all_loss, axis=0)

            # Training Accuracy - averaged
            plt.figure(figsize=(10, 6))
            sns.lineplot(x=range(len(avg_acc)), y=avg_acc)
            plt.title(f"Training Accuracy Average - {opt} - Fold{fold}")
            plt.xlabel("Epochs")
            plt.ylabel("Accuracy (%)")
            plt.tight_layout()
            plt.savefig(os.path.join(PLOT_PATH, f"training_accuracy_{opt}_avg.png"))
            plt.close()

            # Training Loss - averaged
            plt.figure(figsize=(10, 6))
            sns.lineplot(x=range(len(avg_loss)), y=avg_loss)
            plt.title(f"Training Loss Average - {opt} - Fold{fold}")
            plt.xlabel("Epochs")
            plt.ylabel("Loss")
            plt.tight_layout()
            plt.savefig(os.path.join(PLOT_PATH, f"training_loss_{opt}_avg.png"))
            plt.close()

# Predicting Images

In [None]:
import torch
from torchvision import transforms
from PIL import Image,ImageDraw, ImageFont
import os
from model import SimplifiedVGG
from config import DEVICE, NUM_CLASSES

IMAGE_SIZE = 56
MODEL_PATH = "best_model_Adam.pth"
CLASS_NAMES = ["bike","bottle","chair","fork","knife","mug/cup","plant","shoe","spoon","T-shirt"]

tensor_transform = transforms.Compose([
    transforms.ToTensor()
    # transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

def predict_image():
    raw_img = []
    processed_img = []
    img_paths = os.listdir("images")
    n = len(img_paths)
    input_tensor = torch.zeros(n,3,IMAGE_SIZE,IMAGE_SIZE)
    for i,img_path in enumerate(img_paths):
        raw_img += [Image.open("images/"+img_path).convert("RGB")]
        processed_img += [transforms.functional.center_crop(raw_img[i],min(raw_img[i].size)).resize((IMAGE_SIZE,IMAGE_SIZE))]    
        input_tensor[i] = tensor_transform(processed_img[i])

    model = SimplifiedVGG(num_classes=NUM_CLASSES).to(DEVICE)
    model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
    model.eval()

    with torch.no_grad():
        outputs = model(input_tensor)
        probs = torch.softmax(outputs, dim=1)
        predicted_class = torch.argmax(probs, dim=1)
        confidence = torch.max(probs,dim=1)

    output_img = Image.new("RGB",(250,250*n),color="white")
    draw = ImageDraw.Draw(output_img)
    font = ImageFont.truetype("ARIAL.ttf",15)
    for i in range(n):
        text = "\n".join(f"{CLASS_NAMES[j]}: {prob:.2%}" for prob,j in zip(*torch.sort(probs[i],descending=True)))
        draw.text((100,i*250),text,(0,0,0),font)
        output_img.paste(processed_img[i],(0,i*250))
        
    output_img.save("prediction.jpeg")
    print("Prediction saved as prediction.jpeg")

predict_image()