In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torchmetrics
import torch
from torch import nn
from torchvision import datasets
from torchvision.transforms import ToTensor
from pathlib import Path
from timeit import default_timer as timer
from tqdm.auto import tqdm

torch.__version__

In [None]:
EPOCHS = 5
LR = 1e-1
BATCH_SIZE = 32

MODEL_NAME = "cifar_image_classification_model.pth"
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

device = "cuda" if torch.cuda.is_available() else "cpu"

EPOCHS, LR, BATCH_SIZE, device

In [None]:
def display_single_image(image: torch.Tensor, label: str) -> None:
  plt.imshow(image)
  plt.title(label)
  plt.axis(False);

In [None]:
def display_random_images(dataset: datasets, class_names: list) -> None:
  fig = plt.figure(figsize=(16, 16))
  rows, cols = 4, 4

  for i in range(1, rows * cols + 1):
    random_idx = torch.randint(0, len(dataset), size=[1]).item()
    img, label = dataset[random_idx]
    
    fig.add_subplot(rows, cols, i)
    plt.imshow(img.permute(1, 2, 0))
    plt.title(class_names[label])
    plt.axis(False);

In [None]:
def train_step(
  model: nn.Module,
  dataloader: torch.utils.data.DataLoader,
  loss_fn: nn.Module,
  optimizer: torch.optim.Optimizer,
  accuracy_metric: torchmetrics.Accuracy,
  f1_metric: torchmetrics.F1Score,
  precision_metric: torchmetrics.Precision,
  recall_metric: torchmetrics.Recall
) -> None:
  train_loss = 0

  model.train()
  for batch_number, (X, y) in enumerate(dataloader):
    X, y = X.to(device), y.to(device)

    y_pred = model(X)

    loss = loss_fn(y_pred, y)
    train_loss += loss

    accuracy_metric(y_pred, y)
    f1_metric(y_pred, y)
    precision_metric(y_pred, y)
    recall_metric(y_pred, y)

    optimizer.zero_grad()

    loss.backward()

    optimizer.step()

    if batch_number % 100 == 0:
      print(f"Looked at {batch_number * len(X)}/{len(dataloader.dataset)} samples")

  train_loss /= len(dataloader)

  print(f"Train loss: {train_loss}")
  print(f"Train accuracy: {accuracy_metric.compute()} ")
  print(f"Train F1: {f1_metric.compute()}")
  print(f"Train precision: {precision_metric.compute()}")
  print(f"Train recall: {recall_metric.compute()}")

  accuracy_metric.reset()
  f1_metric.reset()
  precision_metric.reset()
  recall_metric.reset()

In [None]:
def test_step(
  model: nn.Module,
  dataloader: torch.utils.data.DataLoader,
  loss_fn: nn.Module,
  accuracy_metric: torchmetrics.Accuracy,
  f1_metric: torchmetrics.F1Score,
  precision_metric: torchmetrics.Precision,
  recall_metric: torchmetrics.Recall
) -> None:
  test_loss = 0

  model.eval()
  with torch.inference_mode():
    for (X, y) in dataloader:
      X, y = X.to(device), y.to(device)

      test_y_pred = model(X)

      test_loss += loss_fn(test_y_pred, y)
      accuracy_metric(test_y_pred, y)
      f1_metric(test_y_pred, y)
      precision_metric(test_y_pred, y)
      recall_metric(test_y_pred, y)

    test_loss /= len(dataloader)

    print(f"Test loss: {test_loss}")
    print(f"Test accuracy: {accuracy_metric.compute()} ")
    print(f"Test F1: {f1_metric.compute()}")
    print(f"Test precision: {precision_metric.compute()}")
    print(f"Test recall: {recall_metric.compute()}")

    accuracy_metric.reset()
    f1_metric.reset()
    precision_metric.reset()
    recall_metric.reset()

In [None]:
def eval_model(model: nn.Module, dataloader: torch.utils.data.DataLoader, loss_fn: nn.Module, class_amount: int, train_time: int = None, device: torch.device = device):
  total_loss = 0

  accuracy_metric = torchmetrics.Accuracy(task="multiclass", num_classes=class_amount).to(device)
  f1_metric = torchmetrics.F1Score(task="multiclass", num_classes=class_amount).to(device)
  precision_metric = torchmetrics.Precision(task="multiclass", average="macro", num_classes=class_amount).to(device)
  recall_metric = torchmetrics.Recall(task="multiclass", average="macro", num_classes=class_amount).to(device)
  
  model.eval()
  with torch.inference_mode():
    for (X, y) in dataloader:
      X, y = X.to(device), y.to(device)
      y_pred = model(X)

      total_loss += loss_fn(y_pred, y)

      accuracy_metric(y_pred, y)
      f1_metric(y_pred, y)
      precision_metric(y_pred, y)
      recall_metric(y_pred, y)

    total_loss /= len(dataloader)

  return {
    "model_name": model.__class__.__name__,
    "model_loss": total_loss.item(),
    "model_accuracy": accuracy_metric.compute().item(),
    "model_f1": f1_metric.compute().item(),
    "model_precision": precision_metric.compute().item(),
    "model_recall": recall_metric.compute().item(),
    "model_train_time": train_time if train_time != None else "no data"
  }

In [None]:
def train_model(model: nn.Module, train_dataloader: torch.utils.data.DataLoader, test_dataloader: torch.utils.data.DataLoader, loss_fn: nn.Module, optimizer: torch.optim.Optimizer, num_classes: int, device: torch.device = device):
  train_accuracy_metric = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes).to(device)
  train_f1_metric = torchmetrics.F1Score(task="multiclass", num_classes=num_classes).to(device)
  train_precision_metric = torchmetrics.Precision(task="multiclass", average="macro", num_classes=num_classes).to(device)
  train_recall_metric = torchmetrics.Recall(task="multiclass", average="macro", num_classes=num_classes).to(device)

  test_accuracy_metric = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes).to(device)
  test_f1_metric = torchmetrics.F1Score(task="multiclass", num_classes=num_classes).to(device)
  test_precision_metric = torchmetrics.Precision(task="multiclass", average="macro", num_classes=num_classes).to(device)
  test_recall_metric = torchmetrics.Recall(task="multiclass", average="macro", num_classes=num_classes).to(device)

  torch.manual_seed(42)

  start_time = timer()

  for epoch in tqdm(range(EPOCHS)):
    print(f"Epoch: {epoch}\n")

    train_step(model, train_dataloader, loss_fn, optimizer, train_accuracy_metric, train_f1_metric, train_precision_metric, train_recall_metric)
    test_step(model, test_dataloader, loss_fn, test_accuracy_metric, test_f1_metric, test_precision_metric, test_recall_metric)

    print("----------------------------")

  end_time = timer()

  print(f"\nTotal training time: {end_time - start_time}s")

  return eval_model(model, test_dataloader, loss_fn, num_classes, end_time - start_time, device)

In [None]:
cifar_10_train_dataset = datasets.FashionMNIST(
  root="data",
  train=True,
  download=True,
  transform=ToTensor(),
)

cifar_10_test_dataset = datasets.FashionMNIST(
  root="data",
  train=False,
  download=True,
  transform=ToTensor(),
)

cifar_100_train_dataset = datasets.CIFAR100(
  root="data",
  train=True,
  download=True,
  transform=ToTensor(),
)

cifar_100_test_dataset = datasets.CIFAR100(
  root="data",
  train=False,
  download=True,
  transform=ToTensor(),
)

In [None]:
temp_image, temp_label = cifar_10_train_dataset[0]

temp_label, temp_image.shape

In [None]:
len(cifar_10_train_dataset), len(cifar_10_test_dataset), len(cifar_100_train_dataset), len(cifar_100_test_dataset)

In [None]:
cifar_10_class_names = cifar_10_train_dataset.classes
cifar_100_class_names = cifar_100_train_dataset.classes

cifar_10_class_names, cifar_100_class_names[:10]

In [None]:
display_single_image(temp_image.permute(1, 2, 0), cifar_10_class_names[temp_label])

In [None]:
display_random_images(cifar_10_train_dataset, cifar_10_class_names)

In [None]:
display_random_images(cifar_10_test_dataset, cifar_10_class_names)

In [None]:
display_random_images(cifar_100_train_dataset, cifar_100_class_names)

In [None]:
display_random_images(cifar_100_test_dataset, cifar_100_class_names)

In [None]:
cifar_10_train_dataloader = torch.utils.data.DataLoader(
  cifar_10_train_dataset,
  batch_size=BATCH_SIZE,
  shuffle=True
)

cifar_10_test_dataloader = torch.utils.data.DataLoader(
  cifar_10_test_dataset,
  batch_size=BATCH_SIZE,
  shuffle=True
)

cifar_100_train_dataloader = torch.utils.data.DataLoader(
  cifar_100_train_dataset,
  batch_size=BATCH_SIZE,
  shuffle=True
)

cifar_100_test_dataloader = torch.utils.data.DataLoader(
  cifar_100_test_dataset,
  batch_size=BATCH_SIZE,
  shuffle=True
)

print(f"CIFAR10 train dataloader: {len(cifar_10_train_dataloader)} batches of {BATCH_SIZE}")
print(f"CIFAR10 test dataloader: {len(cifar_10_test_dataloader)} batches of {BATCH_SIZE}")
print(f"CIFAR100 train dataloader: {len(cifar_100_train_dataloader)} batches of {BATCH_SIZE}")
print(f"CIFAR100 test dataloader: {len(cifar_100_test_dataloader)} batches of {BATCH_SIZE}")

In [None]:
class ClassificationBaselineModel(nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.layer_stack = nn.Sequential(
      nn.Flatten(),
      nn.Linear(in_features=input_shape, out_features=hidden_units),
      nn.ReLU(),
      nn.Linear(in_features=hidden_units, out_features=hidden_units),
      nn.ReLU(),
      nn.Linear(in_features=hidden_units, out_features=hidden_units),
      nn.ReLU(),
      nn.Linear(in_features=hidden_units, out_features=output_shape),
      nn.ReLU(),
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    return self.layer_stack(x)

In [None]:
layer = nn.Flatten()
layer(temp_image).shape, np.prod(np.array(temp_image.shape))

In [None]:
baseline_model = ClassificationBaselineModel(input_shape=np.prod(np.array(temp_image.shape)), hidden_units=128, output_shape=len(cifar_10_class_names))

baseline_model = baseline_model.to(device)

baseline_model.state_dict()

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=baseline_model.parameters(), lr=LR)

In [None]:
train_model(baseline_model, cifar_10_train_dataloader, cifar_10_test_dataloader, loss_fn, optimizer, len(cifar_10_class_names), device)

In [None]:
baseline_model_results = eval_model(baseline_model, cifar_10_test_dataloader, loss_fn, len(cifar_10_class_names), device)
baseline_model_results

In [None]:
class TinyVGGModel(nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.conv_block_1 = nn.Sequential(
      nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=(3, 3), stride=(1, 1), padding=(0, 0)),
      nn.ReLU(),
      nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=(3, 3), stride=(1, 1), padding=(0, 0)),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=(2, 2))
    )
    self.conv_block_2 = nn.Sequential(
      nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=(3, 3), stride=(1, 1), padding=(0, 0)),
      nn.ReLU(),
      nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=(3, 3), stride=(1, 1), padding=(0, 0)),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=(2, 2))
    )
    self.classifier_layer = nn.Sequential(
      nn.Flatten(),
      nn.Linear(in_features=hidden_units*5*5, out_features=output_shape)
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.conv_block_1(x)
    x = self.conv_block_2(x)
    return self.classifier_layer(x)

In [None]:
model = TinyVGGModel(input_shape=3, hidden_units=10, output_shape=len(cifar_10_class_names))

model = model.to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-2)

In [None]:
train_model(baseline_model, cifar_10_train_dataloader, cifar_10_test_dataloader, loss_fn, optimizer, len(cifar_10_class_names), device)

In [None]:
model_results = eval_model(model, cifar_10_test_dataloader, loss_fn, len(cifar_10_class_names), device)
model_results