# Hotdog or not - Image recognition Model
The main goal of projet is to create deep program for image recognition. In this case object objects to be recognized will be hot dogs.

In [None]:
#check if compatible GPU is available 
from torch.cuda import is_available, get_device_name

if is_available():
    print(f"The environment has a compatible GPU ({get_device_name()}) available.")
else:
    print("The environment does NOT have a compatible GPU model available.")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import pandas

def imshow(inp: torch.Tensor) -> None:
    '''Imshow for torch.Tensor'''
    inp = inp.cpu().numpy()
    inp = inp.transpose((1,2,0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std*inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    plt.show()

In [None]:
#import hot dog data
import os
from typing import Tuple
from zipfile import ZipFile
from PIL import Image

from numpy import array
from pandas import read_csv
from requests import get
from torch.utils.data import Dataset




class HotDogDataset(Dataset):
    """"
    Child class of torch.utlis.data.Dataset.
    This is wrapper for mapping from hotdog/not images to the target.
    """

    def __init__(self, dir_name, transform=None) -> None:
        """
        Initialize a HotdogDataset class
        :param dir_name: The name of folder holding the data.
        :param transform:
        """
        self.transform = transform
        #Download, save and extract data if needed.
        if not os.path.isdir(os.path.join(os.getcwd(), f"{dir_name}")):
            r = get(f"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/hotdog-not-hotdog/data/{dir_name}.zip")
            f = open(os.path.join(os.getcwd(), f"{dir_name}.zip"), mode="wb+")
            f.write(r.content)
            f.close()
            with ZipFile(os.path.join(os.getcwd(), f"{dir_name}.zip"), 'r') as zip_ref:
                zip_ref.extractall("./")
                zip_ref.close()
        # Load metadata.
        self.data = read_csv(os.path.join(os.getcwd(), dir_name, f"{dir_name}_labels.csv"))
        #Number of classes.
        self.n_classes = len(self.data['y'].unique())

    def __len__(self) -> int:
        """
        :return: The length of the training/testing dataset.
        """
        return len(self.data)
    
    def __getitem__(self, idx) -> Tuple[array, str]:
        """
        Return the input and target at a specific index of the dataset.

        :param idx: The index of the data to be returned.
        :return: Key-value pair at the specified index.
        """

        # Open corresponding Image
        image = Image.open(os.path.join(os.getcwd(), self.data.loc[idx, 'file_name']))

        # Retrieve the label
        y = self.data.loc[idx, 'y']

        # Transform the image if necessary.
        if self.transform is not None:
            image_ = self.transform(image)
            image.close()
        else:
            image_ = array(image)
            image.close()
        return image_, y


In [None]:
"""
A training framework for classification tasks.
"""

from copy import deepcopy
from typing import List, Tuple
from os.path import join
from os import getcwd

from matplotlib.pyplot import subplots, show
from numpy import sum 
from torch import argmax, device, cuda, save, load
from torch.nn import Module
from torch.nn.functional import softmax
from torch.nn.modules.loss import _Loss
from torch.optim import Optimizer
from torch.utils.data import DataLoader
from tqdm import tqdm




class ClassificationModelTrainer:

    def __init__(self, model: Module, training_set: Dataset, validation_set: Dataset,
                batch_size: int, minimising_criterion: _Loss, optymiser: Optimizer) -> None:
        """
        Initialise a classification model training module.

        :param model: The modeltraining module.
        :param training_set: The set of training data.
        :param validation_set: The set of validation data.
        :param batch_size: The barch  size for training.
        :param minimising_criterion: The loss function.
        :param optimiser: Thr algorithm to ferform minimisation task.
        """

        self._device = device("cuda:0" if cuda.is_available() else "cpu")
        self._model = model.to(self._device)
        self._train_loader = DataLoader(dataset=training_set, batch_size=batch_size, shuffle=True)
        self._validation_loader = DataLoader(dataset=validation_set, batch_size=batch_size, shuffle=True)
        self._minimising_criterion = minimising_criterion
        self._optimiser = optymiser
        self.training_loss = []
        self.validation_acc= []

    def det_model(self) -> Module:
        """
        Detter funcon for model.

        :return: Return the trained model
        """
        return self._model
    
    def train_model(self, n_epochs) -> None:
        """
        Perform the model training.

        :param n_epochs: The number of training epochs to run.
        """
        # Setup the progress bar.
        pbar = tqdm(total=n_epochs * (len(self._train_loader) + len(self._validation_loader)))
        pbar.set_postfix({
            "TrainingLoss": "Not yet available" if len(self.training_loss) == 0 else self.training_loss[-1],
            "Validation Accuracy": "Not yet available" if len(self.validation_acc) == 0 else self.validation_acc[-1],
            "Epoch": 1})

        #Training through the epochs
        for epoch in range(n_epochs):
            loss_sublist = []

            #Training process
            for x,y in self._train_loader:
                x,y = x.to(self._device), y.to(self._device)
                self._model.train()
                z = self._model(x)
                loss = self._minimising_criterion(z, y)
                loss_sublist.append(loss.data.item())
                loss.backward()
                self._optimiser.step()
                self._optimiser.zero_grad()
                pbar.update()
            self.training_loss.append(sum(loss_sublist))

            #Validation process
            correct = 0
            n_test = 0
            for x_test, y_test in self._validation_loader:
                x_test,y_test = x_test.to(self._device), y_test.to(self._device)
                self._model.eval()
                z = softmax(self._model(x_test), dim=1)
                y_hat = argmax(z.data, dim=1)
                correct += (y_hat == y_test).sum().item()
                n_test += y_hat.shape[0]
                pbar.update()
            accuracy = correct / n_test
            self.validation_acc.append(accuracy)
            pbar.set_postfix({
                "Training Loss": self.training_loss[-1],
                "Validation Accuracy": self.validation_acc[-1],
                "Epoch": n_epochs
                })

    def plot_training_stat(self):
        """
        This function plots the training statistics the model trainer collected
        throughout thw training process. Namely, they are

        - Total training loss versis Iterations, and
        - Validation Accuracy versus Iterations.

        Thw two statistics are placed in the same plot, respectively in red and blue.
        """

        #Plot Total training loss versus Iterations
        fig, ax1 = subplots()
        color = 'tab:red'
        ax1.plot(self.training_loss, color=color)
        ax1.set_xlabel('Iterations', color='black')
        ax1.set_ylabel('Total Training Loss', color=color)
        ax1.set_ylim(bottom = 0)
        ax1.tick_params(axis='y', color=color)

        #Plot valodation accuracy versus iterations
        ax2 = ax1.twinx()
        color = 'tab:blue'
        ax2.plot(self.validation_acc, color=color)
        ax2.set_ylabel('Validation Accuracy', color=color)
        ax2.set_ylim(bottom = 0)
        ax2.tick_params(axis='y', color=color)
        ax2.set_ylim(0, 1)
        fig.tight_layout()
        show()

    def test(self, testing_data: Dataset) -> float:
        """
        This function tests the model's performance on a given dataset.

        :patmtesting_data: The dataset to perform testing upon.
        :return: Model's accuracy on the given testing data.
        """

        _class = ["Hot dog", "Not hot dog"]
        j = 0
        total = 0
        print("Here are a list of inaccurately classified results:")
        for x, y in DataLoader(dataset=testing_data, batch_size=1, shuffle=True):
            x, y = x.to(self._device), y.to(self._device)
            predicted = argmax(softmax(self._model(x.to(self._device)), dim=1), dim=1)
            if predicted != y:
                j += 1
                print(f"Actual: {_class[y.item()]}\t\tPredicted: {_class[predicted.item()]}")
                imshow(x[0])
            total += 1
        return 100 - 100*j/total
    
    def dump_to(self, file_name: str) -> None:
        """
        This function dumps the trained model.

        :param file_name: The directory to save state files.
        """
        save_path = join(getcwd(), file_name)
        save({"mpdel_params": self._model.state_dict(),
              "optimiser_stats": self._optimiser.state_dict(),
              "acc":self.validation_acc,
              "loss": self.training_loss
              }, save_path)
    
    def load_from(self, path: str) -> None:
        """
        This function loads the dumped file back to the training framework

        :param path: The path to the dumped file.
        """

        state_dict = load(path, map_location=self._device)
        self._model.load_state_dict(state_dict["model_params"])
        self._optimiser.load_state_dict(state_dict["optimiser_stats"])
        self.validation_acc = state_dict["acc"]
        self.training_loss = state_dict["loss"]

In [None]:
from torchvision import transforms

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
composed = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=5),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
    ])

In [None]:
dataset_full = HotDogDataset('hotdognothotdogfull', transform = composed)

In [None]:
from torch.utils.data import random_split
from torch import manual_seed

manual_seed(0)
training_size = int(len(dataset_full) * 0.7)
validation_size = int(len(dataset_full) * 0.15)
test_size = len(dataset_full) - training_size - validation_size
training_set, validation_set, test_set = random_split(dataset=dataset_full, lengths=(training_size, validation_size, test_size))

In [None]:
# Batch size: train set
batch_size = 50

# Learning rate
lr = 5e-3

# Number epochs
n_epochs = 25

In [None]:
from torchvision import models

model = models.resnet18(pretrained = True)

In [None]:
from torch.nn import Linear

for param in model.parameters():
    param.requires_grad = False

n_classes = dataset_full.n_classes
model.fc = Linear(512, n_classes)

In [None]:
from torch.nn import CrossEntropyLoss
criterion = CrossEntropyLoss()

In [None]:
from torch.optim import Adam
optimizer = Adam(model.parameters(), lr=lr)

In [None]:
trainer = ClassificationModelTrainer(model, training_set, validation_set, batch_size, criterion, optimizer)

In [None]:
r = get(f"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/build-a-hotdog-not-hotdog-classifier-guided-project/twenty-five-iters.pt")
f = open(os.path.join(os.getcwd(), "./twenty-five-iters.pt"), mode="wb+")
f.write(r.content)
f.close()

trainer.load_from("./twenty-five-iters.pt")
trainer.train_model(n_epochs=2)

In [None]:
trainer.plot_training_stat()

In [None]:
accuracy = trainer.test(test_set)

In [None]:
print(f"The model reached an accuracy rate of {accuracy:.2f}% on images it has never seen before.")