# Task 2: Coral Classification using ResNet50
This notebook trains a ResNet50 model to classify whether an image contains corals or not. The ResNet50 model is a convolutional neural network and is available in the PyTorch library.

### Download Data
The data is available for download through a public link. After downloading, unzip the folder to get access to the data.

In [None]:
import gdown

# Download training and validation set
url = 'https://drive.google.com/uc?id=1Gdxb0R8ohGqI4yB4KufWYESl0wIc8r8o'
output = 'Data_2022_assignment_COMP3007.zip'
gdown.download(url, output)

In [2]:
!unzip {output} >/dev/null

In [None]:
# Download testing set
url = 'https://drive.google.com/uc?id=1vc5avjn2lRfnIDC2i7XOq22R70m6UTrH'
output = 'Testing_Data_2022.zip'
gdown.download(url, output)

In [4]:
!unzip {output} >/dev/null

### Define Directories
To access the data, various directories need to be defined. The data directory contains two subdirectories that correspond to the training and validation set. The test data directory contains the testing set.

In [5]:
import os

# Train and valid directories
DATA_PATH = os.path.join('Data', 'coral image classification')
TRAIN_PATH = os.path.join(DATA_PATH, 'train')
VALID_PATH = os.path.join(DATA_PATH, 'val')

# Test directory
TEST_PATH = os.path.join('TestData', 'CoralImageClassification')

### Epoch
An epoch defines a complete pass of the training set that the model sees. During an epoch, the model will attempt to learn the best feature representation to correctly classify data. This learning of features is done with the help of two functions: the loss function and the optimizer. The loss function tells the model how far the predictions are to the ground truth. Based on the loss value, the model will accordingly update its parameters with the help of the optimizer, who calculates the gradients of the model's parameters to minimize the loss function.

At the end of each epoch, the model is evaluated with a validation set. During this phase, there is no optimization. The loss value and the accuracy of the model on the validation set will be calculated. The result determines how well the model is performing.

https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

In [13]:
import torch
from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score, classification_report

# An epoch runner
def run_epoch(model, criterion, optimizer, data_loader, mode='train'):
  # Set the model to training or evaluation mode
  if mode == 'train':
    model.train()
  else:
    model.eval()
  
  # Empty tensors to store the ground truth and predictions
  y_true = torch.zeros(0, dtype=torch.long, device='cpu')
  y_pred = torch.zeros(0, dtype=torch.long, device='cpu')

  # Variables to keep track of the epoch loss
  total_loss = 0
  total_data = 0

  # Check if GPU is available
  device = 'cuda' if torch.cuda.is_available() else 'cpu'

  # Iterate through each batch
  with tqdm(data_loader, desc=mode) as iterator:
    for inputs, labels in iterator:
      # Pass the data and ground truth to the GPU if available
      inputs, labels = inputs.to(device), labels.to(device)

      # Zero out old gradients
      optimizer.zero_grad()

      # Calculate gradient only for training
      with torch.set_grad_enabled(mode == 'train'):
        # Get the values from the fully-connected layer of the model
        outputs = model(inputs)

        # Calculate the loss function
        loss = criterion(outputs, labels)

        # Get the predictions
        _, preds = torch.max(outputs, 1)

        # Keep track of the ground truth and predictions
        y_true = torch.cat([y_true, labels.view(-1).cpu()])
        y_pred = torch.cat([y_pred, preds.view(-1).cpu()])
      
        # Update the model's parameters during training
        if mode == 'train':
          loss.backward()
          optimizer.step()

      # Calculate the epoch loss  
      total_loss += loss.item() * inputs.size(0)
      total_data += inputs.size(0)

      # Keep track of the loss and accuracy
      metrics = {
          'loss': total_loss / total_data,
          'accuracy': accuracy_score(y_true, y_pred)
      }

      # Display the metrics
      iterator.set_postfix(metrics)
  
  # Display the classification report during evaluation
  if mode != 'train':
    print('\n', classification_report(y_true, y_pred, target_names=['No Coral', 'Coral']))
  
  return metrics['loss'], metrics['accuracy']

### Dataset and Data Loader
This cell creates the training, validation, and testing sets from the dataset. Preprocessing is also done according to the original ResNet50 settings. This includes a resizing of the image to a dimension of 224x224 pixels as well as image normalization.

To load the dataset for training and evaluation, three data loaders have been created. All data loaders have a batch size of 8.

In [14]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# Create the training, validation, and testing sets
train_dataset = datasets.ImageFolder(TRAIN_PATH, transform=transform)
valid_dataset = datasets.ImageFolder(VALID_PATH, transform=transform)
test_dataset = datasets.ImageFolder(TEST_PATH, transform=transform)

# Create the training, validation, and testing loaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=os.cpu_count(), pin_memory=True)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, num_workers=os.cpu_count(), pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=os.cpu_count(), pin_memory=True)

### Initialization
This cell downloads a pre-trained ResNet50 model from the PyTorch library. The model comes with pre-trained weights after being trained on the ImageNet dataset. Minor modification is done on the last layer to match two two labels of this task: coral vs no coral.

Cross-entropy loss will be used as the loss function while stochastic gradient descent is used as the optimizer.

In [15]:
from torch import nn, optim
from torchvision import models

# Check if GPU is available for training
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Download the pre-trained model
model = models.resnet50(weights='ResNet50_Weights.DEFAULT')
model.fc = nn.Linear(model.fc.in_features, 2)
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

### Train the Model
To train the model, a number of hyperparameters have to be defined. As the dataset is relatively easy, there is not much hyperparamter tuning to be done. The following hyperparameters have been chosen:
* Epochs: 10
* Batch Size: 8
* Learning Rate: 0.001
* Momentum: 0.9
* Image Size: 224

After each epoch, the accuracy of the model is evaluated on the validation set. Model selection is done by choosing the epoch where the model has the highest accuracy as long as there is no overfitting or underfitting.

In [None]:
# Define the number of epochs
epochs = list(range(10))

# Keep track of the training and validation metrics
train_losses, train_accuracies = [], []
valid_losses, valid_accuracies = [], []

# Keep track of the best model
best_model = None
best_accuracy = 0

# Start training
for i in epochs:
  print('Epoch', i)

  # Train and evaluate the model
  train_loss, train_accuracy = run_epoch(model, criterion, optimizer, train_loader, mode='train')
  valid_loss, valid_accuracy = run_epoch(model, criterion, optimizer, valid_loader, mode='valid')

  # Keep track of the training metrics
  train_losses.append(train_loss)
  train_accuracies.append(train_accuracy)

  # Keep track of the validation metrics
  valid_losses.append(valid_loss)
  valid_accuracies.append(valid_accuracy)

  # Update the best model if the accuracy improved
  if valid_accuracy > best_accuracy:
    best_model = model
    best_accuracy = valid_accuracy
    print('Improved model updated!\n')

# Save the best model
torch.save(model.cpu().state_dict(), 'resnet50.pth')

### Plot Losses
This cell plots the losses of the model on the training and validation set.

In [None]:
import plotly.graph_objects as go
from IPython.display import Image

fig = go.Figure()
fig.add_trace(go.Scatter(x=epochs, y=train_losses, name='train_loss'))
fig.add_trace(go.Scatter(x=epochs, y=valid_losses, name='valid_loss'))
fig.update_layout(
    title="Training vs Validation Loss of ResNet50",
    title_x=0.5,
    xaxis_title="Epoch",
    yaxis_title="Loss"
)
fig.show()

### Plot Accuracies
This cell plots the accuracies of the model on the training and validation set.

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=epochs, y=train_accuracies, name='train_accuracy'))
fig.add_trace(go.Scatter(x=epochs, y=valid_accuracies, name='valid_accuracy'))
fig.update_layout(
    title="Training vs Validation Accuracy of ResNet50",
    title_x=0.5,
    xaxis_title="Epoch",
    yaxis_title="Accuracy"
)
fig.show()

### Evaluate a Trained ResNet50 on Test Set
The following cells load the weights of an already trained model and evaluate it on the test set.

In [None]:
import gdown

url = 'https://drive.google.com/uc?id=1IHNz6za0lRmaVXjG-O50r0eBU9oeCs7u'
output = 'resnet50.pth'
gdown.download(url, output)

In [16]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the best model
model = models.resnet50(weights='ResNet50_Weights.DEFAULT')
model.fc = nn.Linear(model.fc.in_features, 2)
with open('resnet50.pth', 'rb') as f:
  model.load_state_dict(torch.load(f))
model = model.to(device).eval()

In [None]:
# Evaluate the model on the testing set
test_loss, test_accuracy = run_epoch(model, criterion, optimizer, test_loader, mode='test')

### Classify Single Image
This cell classifies a single image. The true label and the prediction are both shown.

In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow
from torch import nn
from PIL import Image

# Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

labels = sorted(os.listdir(TEST_PATH))
modified_labels = ['No Coral', 'Coral']
true_label = modified_labels[0]

# Get the image
img_path = os.path.join(TEST_PATH, labels[0], '13-11-41-27_1.1421167342.57-top_right.png')
img = Image.open(img_path).convert('RGB')

# Preprocess the image
data = transform(img).unsqueeze(0)

# Get the model's output
with torch.no_grad():
  output = model(data.to(device))

# Process the output to get the prediction
img_preds = nn.Softmax(dim=-1)(output)
img_preds = img_preds.cpu().numpy().squeeze()
img_pred = modified_labels[np.argmax(img_preds)]
img_pred_score = np.max(img_preds) * 100

print('True: {}'.format(true_label))
print('Pred: {} ({:.2f}%)'.format(img_pred, img_pred_score))

img = np.array(img)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
cv2_imshow(cv2.resize(img, (256, 256)))