<h1><b>FoodX-251 Image Classification</b></h1>

<b>Visual Information Processing and Management Exam

Sara Arizzi 845374</b>

<hr>

In [1]:
TRAIN = True  # Set to True if you want to retrain all the models
GDRIVE = True  # Set to False if you are running on local env

# Setup

## Import Libraries

In [2]:
%%capture
# ! pip install torchinfo

In [3]:
import os
import shutil
from zipfile import ZipFile
import numpy as np
import random
import pickle

# pytorch
import torch
from torch import nn
# from torchinfo import summary
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights
from torch.utils.data import DataLoader, random_split

# device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# seeds
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

## Data Path

In [5]:
# Mount GDrive or set working dir
if GDRIVE:
  from google.colab import drive
  drive.mount("/content/drive")

  DATA_PATH = os.path.join(*["drive", "MyDrive", "Visual", "data"])
else:
  DATA_PATH = os.path.join(*["something", "something", "data"])
  TRAIN_PATH = ""
  VAL_PATH = ""

Mounted at /content/drive


In [6]:
# Copy and unzip train and val if GDrive
if GDRIVE:
  shutil.copyfile(os.path.join(*[DATA_PATH, "clean", "train.zip"]), "train.zip")
  shutil.copyfile(os.path.join(*[DATA_PATH, "clean", "val.zip"]), "val.zip")
  # shutil.copyfile(os.path.join(DATA_PATH, "val_degraded.zip"), "val_degraded.zip")

  with ZipFile("train.zip", "r") as zf:
    zf.extractall()

  with ZipFile("val.zip", "r") as zf:
    zf.extractall()

  # with ZipFile("val_degraded.zip", "r") as zf:
    # zf.extractall()

  TRAIN_PATH = os.path.join("content", "train")
  VAL_PATH = os.path.join("content", "val")
  # VAL_DEGRADED_PATH = os.path.join("content", "val_degraded")

KeyboardInterrupt: 

In [None]:
print(f"All data is located at: {DATA_PATH}\n"
      f"Trainset is at: {TRAIN_PATH}\n"
      f"Valset is at: {VAL_PATH}\n"
      # f"Valset degraded is at: {VAL_DEGRADED_PATH}\n"
      )

# Prepare Data

# Models

### Training functions

In [None]:
def accuracy_at_k(predictions, y_one_hot, k=1):
    """
    Calculate the top-k accuracy.

    Args:
    - predictions (torch.Tensor): Tensor of shape (batch_size, num_classes) containing the model's predicted scores.
    - y_one_hot (torch.Tensor): One-hot encoded ground truth tensor of shape (batch_size, num_classes).
    - k (int): Number of top predictions to consider for accuracy.

    Returns:
    - accuracy (float): The top-k accuracy score.
    """
    # Convert one-hot encoded ground truth to label indices
    y_true = torch.argmax(y_one_hot, dim=1)

    # Get the top-k predicted indices along the last dimension (num_classes)
    _, top_k_indices = torch.topk(predictions, k, dim=1)

    # Check if the true labels are in the top-k indices
    correct = top_k_indices.eq(y_true.view(-1, 1).expand_as(top_k_indices))

    # Calculate the accuracy
    accuracy = correct.float().sum() / predictions.size(0)

    return accuracy.item()

In [None]:
def train_step(model, data_loader, loss_fn, optimizer,
               device, lambda_value=0, regularization=0, scheduler=None):
    train_loss, train_acc1, train_acc3, train_acc5 = 0, 0, 0, 0

    for batch, (X, y) in enumerate(data_loader):
        # Send data to GPU
        X, y = X.to(device), y.type(torch.LongTensor).to(device)
        y_one_hot = torch.nn.functional.one_hot(y, num_classes=251).float()

        # Forward Pass
        y_pred = model(X)
        y_pred = y_pred.squeeze()

        # Calculate Loss
        loss = loss_fn(y_pred, y_one_hot) + lambda_value * regularization
        train_loss += loss.item()

        # Optimizer reset step
        optimizer.zero_grad()

        # Loss Backpropagation
        loss.backward(retain_graph=True)

        # Optimizer step
        optimizer.step()

        # Calculate accuracy
        train_acc1 += accuracy_at_k(y_pred, y_one_hot, 1)
        train_acc3 += accuracy_at_k(y_pred, y_one_hot, 3)
        train_acc5 += accuracy_at_k(y_pred, y_one_hot, 5)

        # Clean Cache
        torch.cuda.empty_cache()

    # Scheduler step
    lr = None
    if scheduler is not None:
        scheduler.step()
        lr = scheduler.get_last_lr()

    # Print loss and accuracy
    train_loss /= len(data_loader)
    train_acc1 /= len(data_loader)
    train_acc3 /= len(data_loader)
    train_acc5 /= len(data_loader)

    return train_loss, train_acc1, train_acc3, train_acc5, lr

In [None]:
def val_step(model, data_loader, loss_fn, device):
    val_loss, val_acc1, val_acc3, val_acc5 = 0, 0, 0, 0
    model.eval()

    with torch.inference_mode():
        for batch, (X, y) in enumerate(data_loader):
            # Send data to GPU
            X, y = X.to(device), y.type(torch.LongTensor).to(device)
            y_one_hot = torch.nn.functional.one_hot(y, num_classes=251).float()

            # Forward pass
            val_pred = model(X)
            val_pred = val_pred.squeeze()

            # Calculate loss
            loss = loss_fn(val_pred, y_one_hot)
            val_loss += loss.item()

            # Calculate accuracy
            val_acc1 += accuracy_at_k(val_pred, y_one_hot, 1)
            val_acc3 += accuracy_at_k(val_pred, y_one_hot, 3)
            val_acc5 += accuracy_at_k(val_pred, y_one_hot, 5)

            # Clean Cache
            torch.cuda.empty_cache()

    val_loss /= len(data_loader)
    val_acc1 /= len(data_loader)
    val_acc3 /= len(data_loader)
    val_acc5 /= len(data_loader)

    return val_loss, val_acc1, val_acc3, val_acc5

In [None]:
def train(model, train_loader, test_loader, optimizer, loss_fn,
          epochs, device, lambda_value=0, regularization=0, scheduler=None):
    results = {
        "train_loss": [],
        "train_acc1": [],
        "train_acc3": [],
        "train_acc5": [],
        "val_loss": [],
        "val_acc1": [],
        "val_acc3": [],
        "val_acc5": []
    }

    for epoch in range(epochs):
        train_loss, ta1, ta3, ta5, lr = train_step(model=model,
                                                   data_loader=train_loader,
                                                   loss_fn=loss_fn,
                                                   optimizer=optimizer,
                                                   scheduler=scheduler,
                                                   device=device,
                                                   lambda_value=lambda_value,
                                                   regularization=regularization
                                                   )
        val_loss, va1, va3, va5 = val_step(model=model,
                                           data_loader=test_loader,
                                           loss_fn=loss_fn,
                                           device=device)
        # Print out what's happening
        print(
            f"Epoch: {epoch} --> \t"
            f"train_loss: {train_loss:.4f} | "
            f"val_loss: {val_loss:.4f} | "
            f"ta@1: {ta1:.4f} | "
            f"ta@3: {ta3:.4f} | "
            f"ta@5: {ta5:.4f} | "
            f"va@1: {va1:.4f} | "
            f"va@3: {va3:.4f} | "
            f"va@5: {va5:.4f} | "
            f"LR: {lr} "
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc1"].append(ta1)
        results["train_acc3"].append(ta3)
        results["train_acc5"].append(ta5)
        results["val_loss"].append(val_loss)
        results["val_acc1"].append(va1)
        results["val_acc3"].append(va3)
        results["val_acc5"].append(va5)

    # Return the filled results at the end of the epochs
    return results

### MobileNet V3

In [None]:
def get_mob_net(weights=None):
  mod = mobilenet_v3_small(weights=weights)

  # Change to 251 output class
  mod.classifier[3] = nn.Linear(
      in_features=mod.classifier[3].in_features,
      out_features=251, bias=True)

  return mod



```python
summary(
  model=get_res_net(),
  input_size=(32, 3, 224, 224),
  col_names=["input_size", "output_size", "num_params", "trainable"],
  col_width=20,
  row_settings=["var_names"]
)
```



In [None]:
train_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224))
])

val_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224))
])
train_dataset = ImageFolder("train", transform=train_transforms)
val_dataset = ImageFolder("val", transform=val_transforms)

#### From Scratch

In [None]:
# Hyper Parameters
BATCH_SIZE = 256
EPOCHS = 100
LOSS_FN = nn.CrossEntropyLoss()
LEARNING_RATE = 1e-3

train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=True)

m = get_mob_net()
optimizer = torch.optim.Adam(m.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-4)
m = m.to(DEVICE)

model_path = os.path.join(*[DATA_PATH, "saved_models", "mobilenetv3_from_scratch.pt"])
history_path = os.path.join(*[DATA_PATH, "saved_models", "history_mobilenetv3_from_scratch.pkl"])
if TRAIN:
  history = train(m, train_loader, val_loader, optimizer,
                LOSS_FN, EPOCHS, DEVICE, scheduler=scheduler)
  torch.save(m.state_dict(), model_path)
  pickle.dump(history, open(history_path, "wb"))
else:
  m.load_state_dict(torch.load(model_path, map_location=torch.device(DEVICE)))
  history = pickle.load(open(history_path, "rb"))

#### Fine-tuned

In [None]:
# Hyper Parameters
BATCH_SIZE = 256
EPOCHS = 10
LOSS_FN = nn.CrossEntropyLoss()
LEARNING_RATE = 1e-4

train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=True)

m = get_mob_net(weights=MobileNet_V3_Small_Weights.IMAGENET1K_V1)
optimizer = torch.optim.Adam(m.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-5)
m = m.to(DEVICE)

model_path = os.path.join(*[DATA_PATH, "saved_models", "mobilenetv3_finetuned.pt"])
history_path = os.path.join(*[DATA_PATH, "saved_models", "history_mobilenetv3_finetuned.pkl"])
if False:
  history = train(m, train_loader, val_loader, optimizer,
                LOSS_FN, EPOCHS, DEVICE, scheduler=scheduler)
  torch.save(m.state_dict(), model_path)
  pickle.dump(history, open(history_path, "wb"))
else:
  m.load_state_dict(torch.load(model_path, map_location=torch.device(DEVICE)))
  history = pickle.load(open(history_path, "rb"))

Downloading: "https://download.pytorch.org/models/mobilenet_v3_small-047dcff4.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_small-047dcff4.pth
100%|██████████| 9.83M/9.83M [00:00<00:00, 52.1MB/s]


Epoch: 0 --> 	train_loss: 4.3920 | val_loss: 3.0274 | ta@1: 0.1393 | ta@3: 0.2530 | ta@5: 0.3169 | va@1: 0.3242 | va@3: 0.5160 | va@5: 0.6104 | LR: [9.779754323328192e-05] 
Epoch: 1 --> 	train_loss: 2.8595 | val_loss: 2.4030 | ta@1: 0.3690 | ta@3: 0.5570 | ta@5: 0.6372 | va@1: 0.4265 | va@3: 0.6335 | va@5: 0.7141 | LR: [9.140576474687264e-05] 
Epoch: 2 --> 	train_loss: 2.4952 | val_loss: 2.2419 | ta@1: 0.4431 | ta@3: 0.6327 | ta@5: 0.7046 | va@1: 0.4648 | va@3: 0.6627 | va@5: 0.7434 | LR: [8.14503363531613e-05] 
Epoch: 3 --> 	train_loss: 2.2965 | val_loss: 2.1441 | ta@1: 0.4860 | ta@3: 0.6704 | ta@5: 0.7389 | va@1: 0.4840 | va@3: 0.6798 | va@5: 0.7549 | LR: [6.890576474687264e-05] 
Epoch: 4 --> 	train_loss: 2.1519 | val_loss: 2.0913 | ta@1: 0.5165 | ta@3: 0.6979 | ta@5: 0.7609 | va@1: 0.4901 | va@3: 0.6904 | va@5: 0.7668 | LR: [5.500000000000001e-05] 
Epoch: 5 --> 	train_loss: 2.0313 | val_loss: 2.0656 | ta@1: 0.5431 | ta@3: 0.7172 | ta@5: 0.7802 | va@1: 0.5014 | va@3: 0.6954 | va@5: 0

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 4))

plt.subplot(1,2,1)
plt.plot(history.get("train_loss"), label='Train')
plt.plot(history.get("val_loss"), label='Val')
plt.legend()
plt.title("Loss")

plt.subplot(1,2,2)
plt.plot(history.get("train_acc1"), label='Train Acc@1', ls="-", color="gray")
plt.plot(history.get("train_acc3"), label='Train Acc@3', ls="-", color="gray")
plt.plot(history.get("train_acc5"), label='Train Acc@5', ls="-", color="gray")
plt.plot(history.get("val_acc1"), label='Val Acc@1', color="blue")
plt.plot(history.get("val_acc3"), label='Val Acc@3', color="blue")
plt.plot(history.get("val_acc5"), label='Val Acc@5', color="blue")
plt.legend()
plt.title("Accuracy")

plt.show()