In [1]:
%%writefile retrain_functions.py
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision import datasets

from tqdm.auto import tqdm
from typing import Dict, List, Tuple

import numpy as np

def train_test_transforms(image_size=224):
  """
  The function to create transforms for train and test datasets.

  Args: image_size: int | Risize image to (image_size X image_size) | Default: 224

  Return: Tuple(train_transformer, test_transformer)
  """

  # Create transforms for train data
  train_transformer = transforms.Compose([
      transforms.ToPILImage(),
      transforms.Resize(image_size),
      transforms.CenterCrop(image_size),
      transforms.RandomChoice( [
                                transforms.RandomHorizontalFlip(p=0.5),
                                transforms.ColorJitter(contrast=0.9),
                                transforms.ColorJitter(brightness=0.1),
                                transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(contrast=0.9) ], p=0.5),
                                transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(brightness=0.1) ], p=0.5),
                                ] ),
      transforms.ToTensor(),
      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
  ])

  # Create transforms for test data
  test_transformer = transforms.Compose([
          transforms.ToPILImage(),
          transforms.Resize(image_size),
          transforms.CenterCrop(image_size),
          transforms.ToTensor(),
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ])

  return train_transformer, test_transformer

def create_dataloaders(train_arr, test_arr, train_classes, test_classes, train_transformer, test_transformer, batch_size=4):
  """
  The function to create DataLoaders.

  Args: train_arr: numpy.array | NumPy array of train images
        test_arr: numpy.array | NumPy array of test images
        train_classes: numpy.array | NumPy array of train class names
        test_classes: numpy.array | NumPy array of test class names
        train_transformer: torchvision.transforms | Train transforms
        test_transformer: torchvision.transforms | Test transforms
        batch_size: int | Batch size | Default: 4

  Return: Tuple(train_dataloader, test_dataloader, train_data, test_data, class_names)

  train_dataloader: Train DataLoader
  test_dataloader: Test DataLoader
  train_data: Train Dataset
  test_data: Test Dataset
  class_names: Class names
  """

  class CustomDataset(Dataset):
    def __init__(self, images, class_names, transform=None):
        self.images = images
        self.class_names = class_names
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.class_names[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


  # Create Datasets
  train_data = CustomDataset(train_arr, train_classes, transform = train_transformer)
  test_data = CustomDataset(test_arr, test_classes, transform = test_transformer)

  # Get class names
  class_names = list(set(train_data.class_names).union(set(test_data.class_names)))

  # Create train DataLoader for train data
  train_dataloader = DataLoader(dataset=train_data,
                                batch_size=batch_size,
                                shuffle=True)

  # Create train DataLoader for train data
  test_dataloader = DataLoader(dataset=test_data,
                                batch_size=batch_size,
                                shuffle=False)

  return train_dataloader, test_dataloader, train_data, test_data, class_names

def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
    """Trains a PyTorch model for a single epoch.

    Turns a target PyTorch model to training mode and then
    runs through all of the required training steps (forward
    pass, loss calculation, optimizer step).

    Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cpu").

    Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
    """
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
    """Tests a PyTorch model for a single epoch.

    Turns a target PyTorch model to "eval" mode and then performs
    a forward pass on a testing dataset.

    Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cpu").

    Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
    """
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          scheduler,
          early_stopper,
          epochs: int,
          device: torch.device) -> Dict[str, List]:
    """Trains and tests a PyTorch model.

    Passes a target PyTorch models through train_step() and test_step()
    functions for a number of epochs, training and testing the model
    in the same epoch loop.

    Calculates, prints and stores evaluation metrics throughout.

    Args:
    model: A PyTorch model to be trained and tested.
    train_dataloader: A DataLoader instance for the model to be trained on.
    test_dataloader: A DataLoader instance for the model to be tested on.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    loss_fn: A PyTorch loss function to calculate loss on both datasets.
    epochs: An integer indicating how many epochs to train for.
    device: A target device to compute on (e.g. "cpu").

    Returns:
    A dictionary of training and testing loss as well as training and
    testing accuracy metrics. Each metric has a value in a list for
    each epoch.
    In the form: {train_loss: [...],
              train_acc: [...],
              test_loss: [...],
              test_acc: [...]}
    For example if training for epochs=2:
             {train_loss: [2.0616, 1.0537],
              train_acc: [0.3945, 0.3945],
              test_loss: [1.2641, 1.5706],
              test_acc: [0.3400, 0.2973]}
    """
    # Create empty results dictionary
    results = {"train_loss": [],
               "train_acc": [],
               "test_loss": [],
               "test_acc": []
    }

    model.to(device)

    # Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          device=device)

        test_loss, test_acc = test_step(model=model,
                                        dataloader=test_dataloader,
                                        loss_fn=loss_fn,
                                        device=device)

        scheduler.step()

        # Print out what's happening
        print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

        if early_stopper.early_stop(test_loss):
          return results
    # Return the filled results at the end of the epochs
    return results

Writing retrain_functions.py


In [2]:
%%writefile utils.py
import torch
import random

def set_seeds(seed=42):
  """
    Set random seeds (random.seed and torch.manual_seed)
  """
  random.seed(seed)
  torch.manual_seed(42)

Writing utils.py


In [3]:
%%writefile early_stopper.py
"""
 Class EarlyStopper to stop training if validation loss increase.
"""

class EarlyStopper:
    def __init__(self, patience=1, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_validation_loss = np.inf

    def early_stop(self, validation_loss):
        if validation_loss < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.counter = 0
        elif validation_loss > (self.min_validation_loss + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

Writing early_stopper.py


In [4]:
%%writefile retrain.py
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split

from model import create_mobilenet
from utils import set_seeds
from retrain_functions import *
from early_stopper import EarlyStopper

import numpy as np
import os

cwd = os.getcwd()

def numpy_to_tensor(array):
  """
    Convert numpy array to tensor.
  """
  preprocessed_images = []
  for image in array[:,0]:
      # Convert to numpy array and normalize pixel values
      image_array = np.array(image) / 255.0

      # Transpose the dimensions to (channels, height, width)
      image_array = np.transpose(image_array, (2, 0, 1))

      preprocessed_images.append(image_array)

  # Stack the preprocessed images into a tensor
  image_tensor = torch.tensor(preprocessed_images)

  return image_tensor

def build_and_retrain_model(images, class_idx, new_test):
  """The function, that retrain a model.

  Args:
    images: numpy.array
    class_idx: Dict()
    new_test: numpy.array

  Return:
    model.state_dict(): Tensor()
    model_results: Dict(train_loss, test_loss, train_acc, test_acc)
  """
  # Constants
  RANDOM_SEED = 42
  IMAGE_SIZE = 224
  BATCH_SIZE = 4
  TEST_SIZE = 0.2
  NUM_OF_CLASSES = 4

  # Set random seeds
  set_seeds(RANDOM_SEED)

  train_arr, test_arr = train_test_split(images, test_size=TEST_SIZE, random_state=RANDOM_SEED, stratify=images[:,1])

  # Add previous test files to test_arr
  test_arr = np.array([test_arr, new_test])

  train_classes = train_arr[:,1]
  test_classes = test_arr[:,1]

  def class_name_to_digit(names, class_idx):
    result = []
    for name in names:
      result.append(class_idx.get(name))
    return np.array(result)

  train_classes_idx = torch.from_numpy(class_name_to_digit(train_classes, class_idx))
  test_classes_idx = torch.from_numpy(class_name_to_digit(test_classes, class_idx))

  # Create transforms
  train_transforms, test_transforms = train_test_transforms(image_size=IMAGE_SIZE)

  # Numpy to tensor
  train_tensor, test_tensor = numpy_to_tensor(train_arr), numpy_to_tensor(test_arr)

  # Create DataLoaders
  train_dataloader, test_dataloader, train_data, test_data, class_names = create_dataloaders(train_arr=train_tensor,
                                                                                            test_arr=test_tensor,
                                                                                            train_classes=train_classes_idx,
                                                                                            test_classes=test_classes_idx,
                                                                                            train_transformer=train_transforms,
                                                                                            test_transformer=test_transforms,
                                                                                            batch_size=BATCH_SIZE)
  # Create the model and optimizer
  model, transformer = create_mobilenet(NUM_OF_CLASSES)
  model.load_state_dict(
    torch.load(f=os.path.join(cwd,"models/predict/simpsons_model.pth"),
               map_location=torch.device("cpu"))
  )

  optimizer = Adam(params=model.parameters(), lr=0.001, weight_decay=1e-5)

  # Use learning rate scheduler
  scheduler = StepLR(optimizer=optimizer,
                    step_size=3,
                    gamma=0.1,
                    last_epoch=-1,
                    verbose=True)

  # Use label smoothing in the loss function
  loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)

  # Set early stopper to prevent overfitting
  early_stopper = EarlyStopper(patience=3, min_delta=0.1)

  # Train the model
  model_results = train(model=model,
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        scheduler=scheduler,
                        early_stopper=early_stopper,
                        epochs=10,
                        device='cpu')

  return model.state_dict(), model_results

Writing retrain.py


In [None]:
from PIL import Image
from pathlib import Path
import numpy as np

def get_images(path):
  full_array = []
  for one_path in Path(path).glob('*/'):
    image = Image.open(one_path).resize((224,224))
    full_array.append((np.asarray(image), 'Bart Simpson'))

  return np.array(full_array)

images = get_images('test')
images[0]

In [None]:
import retrain
import importlib
importlib.reload(retrain)

new_state_dict, new_model_results = retrain.build_and_retrain_model(images)

new_model_results