# **Model engineering - Part: Transfer Learning**

## **Research question**
Can transfer learning bring a benefit on the performance of CNN models for Rock, Paper, Scissors?

**Reference:** </br>
Code according to: [learnpytorch.io](https://www.learnpytorch.io/05_pytorch_going_modular/)

### Imports

In [1]:
import torch
import torchvision
from torch import nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from typing import Dict, List, Tuple
from timeit import default_timer as timer 

### Function to create the DataLoader

In [62]:
def create_dataloaders(train_dir: str, 
                            test_dir: str, 
                            transform: transforms.Compose, 
                            batch_size: int, 
                            num_workers: int=1
                        ):
    
    # Use ImageFolder to create dataset(s)
    train_data = datasets.ImageFolder(train_dir, transform=transform)
    test_data = datasets.ImageFolder(test_dir, transform=transform)
   
    # Get class names
    class_names = train_data.classes

    # Turn images into data loaders
    train_dataloader = DataLoader(train_data,
                                    batch_size=batch_size,
                                    shuffle=True,
                                    num_workers=num_workers,
                                    pin_memory=True
                                )
    test_dataloader = DataLoader(test_data,
                                    batch_size=batch_size,
                                    shuffle=True,
                                    num_workers=num_workers,
                                    pin_memory=True
                                )

    return train_dataloader, test_dataloader, class_names


### Function to load the pretrained model

In [63]:
def load_pretrained_efficientNet_B0():

    # Load best available weights from pretraining on ImageNet
    weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
    
    # Load pretrained model with selected weights
    model = torchvision.models.efficientnet_b0(weights=weights)

    return model, weights

### Function to load the dataset

In [64]:
def load_data(train_dir: str, test_dir: str, weights, num_workers: int, batch_size: int):
    # Get the transforms used to create our pretrained weights
    auto_transforms = weights.transforms()

    # Create training and testing DataLoaders as well as get a list of class names
    train_dataloader, test_dataloader, class_names = create_dataloaders(train_dir=train_dir,
                                                                                test_dir=test_dir,
                                                                                transform=auto_transforms, # perform same data transforms on our own data as the pretrained model
                                                                                batch_size=batch_size, # set mini-batch size to 32
                                                                                num_workers=num_workers) 


    return train_dataloader, test_dataloader, class_names

### Function to recreate the classifier layer of the model

In [65]:
def recreate_classifier_layer(model: torch.nn.Module, dropout: int, class_names: list):
    # Freeze all base layers in the "features" section of the model (the feature extractor) by setting requires_grad=False
    for param in model.features.parameters():
        param.requires_grad = False

    # Set the manual seeds
    torch.manual_seed(42)

    # Recreate the classifier layer
    model.classifier = torch.nn.Sequential(
                            torch.nn.Dropout(p=dropout, inplace=True), 
                            torch.nn.Linear(in_features=1280, 
                            out_features=len(class_names), # one output unit for each class
                            bias=True))

    return model

### Functions to train the pretrained model

In [66]:
def train_step(model: torch.nn.Module, 
                    dataloader: torch.utils.data.DataLoader, 
                    loss_fn: torch.nn.Module, 
                    optimizer: torch.optim.Optimizer
                ) -> Tuple[float, float]:

  # Put model in train mode
  model.train()

  # Setup train loss and train accuracy values
  train_loss, train_acc = 0, 0

  # Loop through data loader data batches
  for batch, (X, y) in enumerate(dataloader):

      # 1. Forward pass
      y_pred = model(X)

      # 2. Calculate  and accumulate loss
      loss = loss_fn(y_pred, y)
      train_loss += loss.item() 

      # 3. Optimizer zero grad
      optimizer.zero_grad()

      # 4. Loss backward
      loss.backward()

      # 5. Optimizer step
      optimizer.step()

      # Calculate and accumulate accuracy metric across all batches
      y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
      train_acc += (y_pred_class == y).sum().item()/len(y_pred)

  # Adjust metrics to get average loss and accuracy per batch 
  train_loss = train_loss / len(dataloader)
  train_acc = train_acc / len(dataloader)
  return train_loss, train_acc

In [67]:
def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module
              ) -> Tuple[float, float]:

  # Put model in eval mode
  model.eval() 

  # Setup test loss and test accuracy values
  test_loss, test_acc = 0, 0

  # Turn on inference context manager
  with torch.inference_mode():
      # Loop through DataLoader batches
      for batch, (X, y) in enumerate(dataloader):

          # 1. Forward pass
          test_pred_logits = model(X)

          # 2. Calculate and accumulate loss
          loss = loss_fn(test_pred_logits, y)
          test_loss += loss.item()

          # Calculate and accumulate accuracy
          test_pred_labels = test_pred_logits.argmax(dim=1)
          test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

  # Adjust metrics to get average loss and accuracy per batch 
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)
  return test_loss, test_acc

In [68]:
def train(model: torch.nn.Module, 
            train_dataloader: torch.utils.data.DataLoader, 
            test_dataloader: torch.utils.data.DataLoader, 
            optimizer: torch.optim.Optimizer,
            loss_fn: torch.nn.Module,
            epochs: int
            ) -> Dict[str, List]:

  # Create empty results dictionary
  results = {"train_loss": [],
                "train_acc": [],
                "test_loss": [],
                "test_acc": []
            }

  # Loop through training and testing steps for a number of epochs
  for epoch in tqdm(range(epochs)):
      train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer
                                          )
      test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn
          )

      # Print out what's happening
      print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
      )

      # Update results dictionary
      results["train_loss"].append(train_loss)
      results["train_acc"].append(train_acc)
      results["test_loss"].append(test_loss)
      results["test_acc"].append(test_acc)

  # Return the filled results at the end of the epochs
  return results

## Main-method

In [None]:
# Define dataset path
data_path = '../data_original/dataset_1'
train_dir = data_path + "/train"
test_dir = data_path + "/test"

# Set parameters
seed = 42
learning_rate = 0.001
epochs = 5
dropout = 0.2
num_workers = 1
batch_size = 32

# Load pretrained model, weights and the transforms
model, weights = load_pretrained_efficientNet_B0()

# Load data
train_dataloader, test_dataloader, class_names = load_data(train_dir=train_dir,
                                                                test_dir=test_dir, 
                                                                weights=weights, 
                                                                num_workers=num_workers, 
                                                                batch_size=batch_size
                                                            )

# Recreate classifier layer
model = recreate_classifier_layer(model=model, 
                                        dropout=dropout, 
                                        class_names=class_names
                                    )

# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Set the random seeds
torch.manual_seed(seed)

# Start the timer
start_time = timer()

# Setup training and save the results
results = train(model=model,
                    train_dataloader=train_dataloader,
                    test_dataloader=test_dataloader,
                    optimizer=optimizer,
                    loss_fn=loss_fn,
                    epochs=epochs
                )

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")