## Loading the Data

In [None]:
import pandas as pd
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from sklearn.model_selection import train_test_split
import zipfile
import os

# Load the metadata
metadata = pd.read_csv('meta_data.csv')

# Create a dictionary mapping from image file name to is_training_image
is_training_image = dict(zip(metadata.augmented_image_name, metadata.is_training_image))

# Define the transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

directory = "../augmented_images"

# Check if the directory exists
if os.path.isdir(directory):
    print("Directory exists skipping zip extraction")
else:
    print("Directory does not exist, extracting zip file...")

    # Specify the zip file name
    zip_file = "../augmented_images.zip"

    # Specify the directory to extract to
    extract_dir = "../"

    # Open the zip file in read mode
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        # Extract all the contents of the zip file in the specified directory
        zip_ref.extractall(extract_dir)

print("Loading the dataset...")
# Load the dataset
dataset = ImageFolder(directory, transform=transform)

print("Splitting the dataset...")
# Split the dataset into training and testing sets
train_indices = [i for i, (img, label) in enumerate(dataset.imgs) if is_training_image[os.path.normpath(img).replace('\\', '/').replace("../", "./")]]
test_indices = [i for i, (img, label) in enumerate(dataset.imgs) if not is_training_image[os.path.normpath(img).replace('\\', '/').replace("../", "./")]]

train_dataset = Subset(dataset, train_indices)
test_dataset = Subset(dataset, test_indices)

print("Creating the data loaders...")
# Create the data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

def show_example(img, label):
    print('Label: ', dataset.classes[label], "("+str(label)+")")
    plt.imshow(img.permute(1, 2, 0))

img, label = dataset[0]
show_example(img, label)

## Split Training Dataset into train and validation

In [None]:
from torch.utils.data import random_split

# Define the size of the validation set
val_size = int(0.2 * len(train_dataset))  # 20% for validation
train_size = len(train_dataset) - val_size

# Split the training dataset into training and validation sets
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create the data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print("Training set size: ", len(train_dataset))
print("Validation set size: ", len(val_dataset))

In [None]:
import torchvision.utils as vutils

def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(vutils.make_grid(images, nrow=8).permute(1, 2, 0))
        break
show_batch(train_loader)

## MobileNet Transfer Learning Model Creation

In [None]:
import torch
from torch import nn
from torchvision.models import mobilenet_v2
import torch.nn.functional as F

def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()

    def training_step(self, batch):
        raise NotImplementedError

    def validation_step(self, batch):
        raise NotImplementedError

    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}".format(
            epoch, result['train_loss'], result['val_loss']))

class MobileNetModel(BaseModel):
    def __init__(self, num_classes):
        super().__init__()
        self.network = mobilenet_v2(pretrained=True)

        # Freeze all the parameters of the model
        for param in self.network.parameters():
            param.requires_grad = False

        # Unfreeze the last five layers
        for param in list(self.network.parameters())[-5:]:
            param.requires_grad = True

        # Replace the classifier with a custom one
        num_ftrs = self.network.classifier[1].in_features
        self.network.classifier[1] = nn.Linear(num_ftrs, num_classes)

    def forward(self, xb):
        return self.network(xb)

    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss

    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}

## Training the Model

In [None]:
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = len(dataset.classes)
model = MobileNetModel(num_classes).to(device)

def fit(epochs, lr, train_loader, val_loader, optimizer):
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    optimizer = optimizer(model.parameters(), lr)
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        train_accs = []
        print("Starting epoch ", epoch+1, " of ", epochs)
        for batch in train_loader:
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)
            # Compute predictions and losses
            outputs = model(inputs)
            loss = model.training_step(batch)
            train_losses.append(loss.item())
            # Compute accuracy
            _, preds = torch.max(outputs, 1)
            acc = torch.sum(preds == labels.data) / len(labels)
            train_accs.append(acc.item())
            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        # Record training loss and accuracy
        history['train_loss'].append(np.mean(train_losses))
        history['train_acc'].append(np.mean(train_accs))

        # Validation phase
        model.eval()
        val_losses = []
        val_accs = []
        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs = inputs.to(device)
                labels = labels.to(device)
                # Compute predictions and losses
                loss = model.validation_step(batch)['val_loss']
                val_losses.append(loss.item())
                # Compute accuracy
                acc = model.validation_step(batch)['val_acc']
                val_accs.append(acc.item())
        # Record validation loss and accuracy
        history['val_loss'].append(np.mean(val_losses))
        history['val_acc'].append(np.mean(val_accs))
        print(f'Epoch {epoch+1}/{epochs}, train loss: {np.mean(train_losses):.4f}, val loss: {np.mean(val_losses):.4f}, train acc: {np.mean(train_accs):.4f}, val acc: {np.mean(val_accs):.4f}')
    return history

In [None]:
num_epochs = 10
opt_func = torch.optim.Adam
lr = 0.001

history = fit(num_epochs, lr, train_loader, val_loader, opt_func)

In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

    ax1.plot(history['train_loss'], label='train loss')
    ax1.plot(history['val_loss'], label='validation loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.legend()

    ax2.plot(history['train_acc'], label='train accuracy')
    ax2.plot(history['val_acc'], label='validation accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.legend()

    plt.tight_layout()
    plt.show()

plot_history(history)

## Test the Model

In [None]:
def test_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('Test Accuracy of the model on the test images: {} %'.format(100 * correct / total))

test_model(model, test_loader, device)

## Save the Model

In [None]:
# Save the trained model
torch.save(model, 'mobilenet.pth')