<a href="https://colab.research.google.com/github/rafio-iut/Vegetable-Classification-and-Quality-Assessment/blob/grad-cam/Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initialize Environment

## Install Packages

In [None]:
!pip install python-dotenv

## Import Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import Subset
from torch.utils.data import ConcatDataset
from torch.utils.data import random_split
import torch.optim as optim
import torchvision
from torchvision import models, transforms
from torchsummary import summary

import math
import time
import random
import os
import shutil
import copy
import dotenv

## Random Seed

In [None]:
random_state = 112

random = random.Random(random_state)
torch.manual_seed(random_state)

## Setup GPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

## Load Environment Variables

In [None]:
dotenv.load_dotenv('.env')
github_token = os.getenv('GITHUB_TOKEN')

## Clone Repository

In [None]:
!rm -r *
!git clone https://{github_token}@github.com/rafio-iut/Vegetable-Classification-and-Quality-Assessment.git
!mv Vegetable-Classification-and-Quality-Assessment/* .
!rm -r Vegetable-Classification-and-Quality-Assessment

## Explode Directories

In [None]:
root_dir = "/content/New VegNet"

for folder_name in os.listdir(root_dir):
    folder_path = os.path.join(root_dir, folder_name)

    if os.path.isdir(folder_path):
        for subfolder_name in os.listdir(folder_path):
            subfolder_path = os.path.join(folder_path, subfolder_name)

            if os.path.isdir(subfolder_path):
                new_folder_name = folder_name + " - " + subfolder_name
                new_folder_name = new_folder_name.split(". ")[1]
                new_folder_path = os.path.join(root_dir, new_folder_name)

                os.makedirs(new_folder_path, exist_ok=True)

                for file_name in os.listdir(subfolder_path):
                    file_path = os.path.join(subfolder_path, file_name)
                    new_file_path = os.path.join(new_folder_path, file_name)
                    shutil.move(file_path, new_file_path)

                os.rmdir(subfolder_path)

## Delete Empty Folders

In [None]:
root_dir = '/content/New VegNet'

for root, dirs, files in os.walk(root_dir, topdown=False):
        for folder in dirs:
            folder_path = os.path.join(root, folder)
            if not os.listdir(folder_path):
                os.rmdir(folder_path)

## Default Options

In [None]:
skip_count = False

## Modified Options

In [None]:
skip_count = True

# Load Data

## Load Raw Dataset

In [None]:
root_dir = '/content/New VegNet'

raw_dataset = torchvision.datasets.ImageFolder(root=root_dir, transform=transforms.ToTensor())
all_labels = raw_dataset.classes

print(all_labels)
print(len(raw_dataset))

In [None]:
def parse_label(label):
    words = label.split(" - ")
    if len(words) < 2: return None, None
    vegetable = words[0].strip()
    quality = words[1].strip()
    return vegetable, quality

In [None]:
all_vegetables = []
all_qualities = []

for label in all_labels:
    vegetable, quality = parse_label(label)
    if vegetable not in all_vegetables: all_vegetables.append(vegetable)
    if quality not in all_qualities: all_qualities.append(quality)

all_vegetables = sorted(all_vegetables)
all_qualities = sorted(all_qualities)

print(all_vegetables)
print(all_qualities)

## Shuffle Dataset

In [None]:
num_samples = len(raw_dataset)
shuffle_indices = list(range(num_samples))
random.shuffle(shuffle_indices)
print(shuffle_indices[:10])

raw_dataset = Subset(raw_dataset, shuffle_indices)

# Display Dataset

## Utility Functions

In [None]:
def get_label_counts(dataset):
    targets = [target for _, target in dataset]

    counts = {}
    for target in targets:
        label = all_labels[target]
        if label in counts: counts[label] += 1
        else: counts[label] = 1

    return dict(sorted(counts.items()))

def get_label_images(dataset):
    label_images = {}
    for image, target in dataset:
        label = all_labels[target]
        if label not in label_images:
            label_images[label] = image
            if len(label_images) == len(all_labels): return dict(sorted(label_images.items()))

    return dict(sorted(label_images.items()))

In [None]:
def display_counts(dataset, dataset_name):
    if skip_count: return
    label_counts = get_label_counts(dataset)

    df_table = pd.DataFrame(index=all_vegetables, columns=all_qualities)
    df_table.fillna(0, inplace=True)

    for label, count in label_counts.items():
        vegetable, quality = parse_label(label)
        df_table.loc[vegetable, quality] = count

    df_table.loc['Total'] = df_table.sum()
    df_table['Total'] = df_table.sum(axis=1)

    print(dataset_name)
    display(df_table)
    print()

In [None]:
def display_images(dataset, dataset_name):
    label_images = get_label_images(dataset)

    num_classes = len(label_images)
    num_cols = 5
    num_rows = math.ceil(num_classes / num_cols)
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 4*num_rows))

    for i, (label, image) in enumerate(label_images.items()):
        row = i // num_cols
        col = i % num_cols

        image = image.numpy().transpose(1, 2, 0)
        vegetable, quality = parse_label(label)

        ax = axes[row, col] if num_rows > 1 else axes[col]
        ax.imshow(image)
        ax.set_title(vegetable, fontsize=12, fontweight='bold', pad=10)
        ax.axis('off')
        ax.text(0.5, -0.075, quality, transform=ax.transAxes, ha='center', fontsize=12)

    for i in range(len(label_images), num_rows * num_cols):
        row = i // num_cols
        col = i % num_cols

        ax = axes[row, col] if num_rows > 1 else axes[col]
        ax.axis('off')

    plt.tight_layout(h_pad=2)
    print(dataset_name)
    plt.show()

## Display Counts

In [None]:
display_counts(raw_dataset, 'Raw Dataset')

## Display Images

In [None]:
display_images(raw_dataset, 'Raw Dataset')

# Data Transformation

## Define Transformations

In [None]:
transformations = [
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.4, contrast=0.4),
    transforms.RandomAffine(degrees=0, shear=10),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.RandomAffine(degrees=0, scale=(0.9, 1.1)),
    transforms.ToTensor()
]

display_transformations = {
    'Random Rotation': transformations[0],
    'Random Horizontal Flip': transforms.RandomHorizontalFlip(1.0),
    'Random Vertical Flip': transforms.RandomVerticalFlip(1.0),
    'Random Jitter': transformations[3],
    'Random Shear': transformations[4],
    'Random Shift': transformations[5],
    'Random Scale': transformations[6]
}

## Display Transformations

In [None]:
random_index = random.randint(0, len(raw_dataset)-1)
sample_image, _ = raw_dataset[random_index]

num_transformations = len(display_transformations)
num_cols = 4
num_rows = math.ceil(num_transformations / num_cols)

fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 5*num_rows))

image = sample_image.numpy().transpose(1, 2, 0)
axes[0, 0].imshow(image)
axes[0, 0].set_title('Original')

for i, (title, transform) in enumerate(display_transformations.items()):
    transformed_image = transform(sample_image)
    image = transformed_image.numpy().transpose(1, 2, 0)
    row = (i + 1) // num_cols
    col = (i + 1) % num_cols
    axes[row, col].imshow(image)
    axes[row, col].set_title(title)

for i in range(num_transformations + 1, num_rows * num_cols):
    row = i // num_cols
    col = i % num_cols
    axes[row, col].axis('off')

plt.tight_layout()
plt.show()

## Create Transformed Dataset

In [None]:
class TransformedDataset(Dataset):
    def __init__(self, dataset, transform):
        if isinstance(dataset, Subset):
            self.dataset = dataset.dataset
            self.indices = dataset.indices
        else:
            self.dataset = dataset
            self.indices = range(len(dataset))

        self.transform = transform
        self.to_pil = transforms.ToPILImage()

        original_attrs = vars(self.dataset)
        for attr_name, attr_value in original_attrs.items():
            if not hasattr(self, attr_name):
                setattr(self, attr_name, attr_value)

        original_methods = [method_name for method_name in dir(self.dataset) if callable(getattr(self.dataset, method_name))]
        for method_name in original_methods:
            if not hasattr(self, method_name):
                method = getattr(self.dataset, method_name)
                setattr(self, method_name, method)

    def __getitem__(self, index):
        image, label = self.dataset[self.indices[index]]
        image = self.to_pil(image)
        transformed_image = self.transform(image)
        return transformed_image, label

    def __len__(self):
        return len(self.indices)

transformed_dataset = TransformedDataset(raw_dataset, transforms.Compose(transformations))

## Display Dataset

In [None]:
display_counts(transformed_dataset, 'Transformed Dataset')
display_images(transformed_dataset, 'Transformed Dataset')

# K-Fold Cross Validation

## Utility Functions

In [None]:
def balance_classes(dataset):
    if isinstance(dataset, Subset):
        original_dataset = dataset.dataset
        dataset_indices = dataset.indices
    else:
        original_dataset = dataset
        dataset_indices = range(len(dataset))

    num_classes = len(original_dataset.classes)
    class_indices = [[] for _ in range(num_classes)]
    class_counts = [0] * num_classes

    for index in dataset_indices:
        _, target = original_dataset[index]
        class_indices[target].append(index)
        class_counts[target] += 1

    max_count = max(class_counts)

    duplicates = []

    for target, count in enumerate(class_counts):
        while count < max_count:
            duplicate_index = random.choice(class_indices[target])
            duplicates.append(duplicate_index)
            count += 1

    balanced_indices = list(dataset_indices) + duplicates
    balanced_dataset = Subset(original_dataset, balanced_indices)

    return balanced_dataset

def generate_subsets(dataset, k):
    if isinstance(dataset, Subset):
        original_dataset = dataset.dataset
        dataset_indices = dataset.indices
    else:
        original_dataset = dataset
        dataset_indices = range(len(dataset))

    num_classes = len(original_dataset.classes)
    class_indices = [[] for _ in range(num_classes)]
    class_counts = [0] * num_classes

    subset_class_counts = [[0] * k for _ in range(num_classes)]
    subset_indices = [[] for _ in range(k)]
    subsets = []

    for index in dataset_indices:
        _, target = original_dataset[index]
        target_counts = subset_class_counts[target]

        candidate_subsets = []
        for i in range(k):
            if len(candidate_subsets) == 0: candidate_subsets.append(i)
            else:
                min_count = target_counts[candidate_subsets[0]]
                if target_counts[i] == min_count: candidate_subsets.append(i)
                elif target_counts[i] < min_count: candidate_subsets = [i]

        selected_subset = random.choice(candidate_subsets)
        subset_indices[selected_subset].append(index)
        subset_class_counts[target][selected_subset] += 1

    for indices in subset_indices:
        random.shuffle(indices)
        subsets.append(Subset(original_dataset, indices))

    return subsets

## Prepare Subsets

In [None]:
k = 5
plain_subset = generate_subsets(transformed_dataset, k)

balanced_subset = []
for i in range(k): balanced_subset.append(balance_classes(plain_subset[i]))

In [None]:
for i, subset in enumerate(plain_subset): display_counts(subset, f'Subset {i+1}')
for i, subset in enumerate(balanced_subset): display_counts(subset, f'Balanced Subset {i+1}')

In [None]:
def get_data_splits(plain_subset, balanced_subset, test_index):
    data_split = {}

    testset = plain_subset[test_index].dataset.dataset
    trainvalset = ConcatDataset(balanced_subset[:test_index] + balanced_subset[test_index+1:])

    train_size = int(0.8 * len(trainvalset))
    val_size = len(trainvalset) - train_size
    trainset, valset = random_split(trainvalset, [train_size, val_size])

    data_split['Train'] = trainset
    data_split['Val'] = valset
    data_split['Test'] = testset
    for phase in ['Train', 'Val', 'Test']: display_counts(data_split[phase], f'{phase}-set {test_index + 1}')

    return data_split

## Prepare Data Loaders

In [None]:
k = 5
data_splits = []
dataloaders = []

for i in range(k):
    print(f'Preparing Test {i+1}')
    print('-' * 20)
    print()

    split = get_data_splits(plain_subset, balanced_subset, i)
    loader = {x: torch.utils.data.DataLoader(split[x], batch_size=32, shuffle=True, num_workers=2)
                      for x in ['Train', 'Val', 'Test']}
    data_splits.append(split)
    dataloaders.append(loader)

## Display a Data Split

In [None]:
display_images(data_splits[0]['Train'], 'Train-set 1')
display_images(data_splits[0]['Val'], 'Val-set 1')
display_images(data_splits[0]['Test'], 'Test-set 1')

# Train Models

## Utility Functions

In [None]:
def run_step(data_split, dataloader, model, preprocess, criterion, optimizer, phase):
    running_loss = 0.0
    correct_counts = 0

    for batch_image, batch_target in dataloader[phase]:
        batch_image = [preprocess(image) for image in batch_image]
        batch_image = torch.stack(batch_image)
        batch_image = batch_image.to(device)
        batch_target = batch_target.to(device)

        with torch.set_grad_enabled(phase == 'Train'):
            batch_probabilities = model(batch_image)
            _, batch_prediction = torch.max(batch_probabilities, 1)
            loss = criterion(batch_probabilities, batch_target)

            if phase == 'Train':
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        running_loss += loss.item() * batch_image.size(0)
        correct_counts += torch.sum(batch_prediction == batch_target)

    step_loss = running_loss / len(data_split[phase])
    step_acc = correct_counts.double() / len(data_split[phase])

    return step_loss, step_acc

def train_model(data_split, dataloader, model, preprocess, hyperparameters):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    criterion, learning_rate, momentum, scheduler_step_size, gamma, patience = (
        hyperparameters['loss_function'],
        hyperparameters['learning_rate'],
        hyperparameters['momentum'],
        hyperparameters['scheduler_step_size'],
        hyperparameters['gamma'],
        hyperparameters['patience']
    )
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=scheduler_step_size, gamma=gamma)
    num_epochs = hyperparameters.get('num_epochs', 1)

    counter = 0

    for epoch in range(1, num_epochs + 1, 1):
        print(f'Running Epoch {epoch}/{num_epochs}')
        print('-' * 10)

        for phase in ['Train', 'Val']:
            if phase == 'Train':
                model.train()
            else:
                model.eval()

            step_loss, step_acc = run_step(data_split, dataloader, model, preprocess, criterion, optimizer, phase)
            print(f'{phase} Loss: {step_loss:.4f}, Acc: {step_acc:.2%}')

            if phase == 'Val':
                if step_acc > best_acc:
                    best_acc = step_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                    counter = 0
                else:
                    counter += 1
                    if best_acc==1 or counter >= patience: break

        if phase == 'Train': scheduler.step()
        if best_acc==1 or counter >= patience:
            print(f'Convergence reached after {epoch} epochs.')
            print('Stopping Training')
            print('-' * 10)
            break

    time_elapsed = time.time() - since
    print(f'\nTraining complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc * 100:.2f}%')
    print()

    model.load_state_dict(best_model_wts)
    return model

def train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=None):
    base_state = copy.deepcopy(model.state_dict())
    trained_models = []

    for test_index in range(k):
        model.load_state_dict(base_state)
        print(f'Training {model_name} for Test {test_index + 1}')
        print('-' * 20)
        print()

        data_split = data_splits[test_index]
        dataloader = dataloaders[test_index]

        model = train_model(data_split, dataloader, model, preprocess, hyperparameters)
        trained_models.append(copy.deepcopy(model))

    return trained_models

## Run Training

In [None]:
def get_output(model, preprocess, image_tensor):
    model.eval()

    with torch.no_grad():
        image = preprocess(image_tensor)
        image = image.unsqueeze(0).to(device)

        probabilities = model(image)
        _, prediction = torch.max(probabilities, 1)

        prediction = prediction.item()
        probabilities = probabilities.squeeze().detach().cpu().numpy()

    return prediction, probabilities

In [None]:
completed_model_sets = []

### AlexNet

In [None]:
model_name = 'AlexNet'
weights = models.AlexNet_Weights.IMAGENET1K_V1
model = models.alexnet(weights=weights)
preprocess = weights.transforms(antialias=None)

model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 2,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### ResNet-50

In [None]:
model_name = 'ResNet-50'
weights = models.ResNet50_Weights.IMAGENET1K_V2
model = models.resnet50(weights=weights)
preprocess = weights.transforms(antialias=None)

model.fc = nn.Linear(model.fc.in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### DenseNet-169

In [None]:
model_name = 'DenseNet-169'
weights = models.DenseNet169_Weights.IMAGENET1K_V1
model = models.densenet169(weights=weights)
preprocess = weights.transforms(antialias=None)

model.classifier = nn.Linear(model.classifier.in_features, len(all_labels))
model = model.to(device)

# crop_size = preprocess.crop_size[0]
# input_size = (3, crop_size, crop_size)
# summary(model, input_size)

In [None]:
hyperparameters = {
    'loss_function': nn.CrossEntropyLoss(),
    'learning_rate': 0.001,
    'momentum': 0.9,
    'scheduler_step_size': 7,
    'gamma': 0.1,
    'num_epochs': 1,
    'patience': 10
}

trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

completed_model_sets.append(
    {
        'name': model_name,
        'preprocess': preprocess,
        'models': trained_models
    }
)

### VGG-16

In [None]:
model_name = 'VGG-16'
weights = models.VGG16_Weights.IMAGENET1K_V1
model = models.vgg16(weights=weights)
preprocess = weights.transforms(antialias=None)

model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### EfficientNetV2-S

In [None]:
model_name = 'EfficientNetV2-S'
weights = models.EfficientNet_V2_S_Weights.IMAGENET1K_V1
model = models.efficientnet_v2_s(weights=weights)
preprocess = weights.transforms(antialias=None)

model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### MobileNetV3

In [None]:
model_name = 'MobileNetV3'
weights = models.MobileNet_V3_Large_Weights.IMAGENET1K_V2
model = models.mobilenet_v3_large(weights=weights)
preprocess = weights.transforms(antialias=None)

model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### SqueezeNet1.0

In [None]:
# model_name = 'SqueezeNet1.0'
# weights = models.SqueezeNet1_0_Weights.IMAGENET1K_V1
# model = models.squeezenet1_0(weights=weights)
# preprocess = weights.transforms(antialias=None)

# model.classifier[-1] = nn.Linear(model.classifier[-1].in_features, len(all_labels))
# model = model.to(device)

# crop_size = preprocess.crop_size[0]
# input_size = (3, crop_size, crop_size)
# summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### ShuffleNetV2

In [None]:
model_name = 'ShuffleNetV2'
weights = models.ShuffleNet_V2_X1_0_Weights.IMAGENET1K_V1
model = models.shufflenet_v2_x1_0(weights=weights)
preprocess = weights.transforms(antialias=None)

model.fc = nn.Linear(model.fc.in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

### ResNet-152

In [None]:
model_name = 'ResNet-152'
weights = models.ResNet152_Weights.IMAGENET1K_V2
model = models.resnet152(weights=weights)
preprocess = weights.transforms(antialias=None)

model.fc = nn.Linear(model.fc.in_features, len(all_labels))
model = model.to(device)

crop_size = preprocess.crop_size[0]
input_size = (3, crop_size, crop_size)
summary(model, input_size)

In [None]:
# hyperparameters = {
#     'loss_function': nn.CrossEntropyLoss(),
#     'learning_rate': 0.001,
#     'momentum': 0.9,
#     'scheduler_step_size': 7,
#     'gamma': 0.1,
#     'num_epochs': 1,
#     'patience': 10
# }

# trained_models = train_model_kfold(model_name, model, preprocess, k=5, hyperparameters=hyperparameters)

# completed_model_sets.append(
#     {
#         'name': model_name,
#         'preprocess': preprocess,
#         'models': trained_models
#     }
# )

# Evaluate Models

## Setup DataFrame

In [None]:
def to_percentage(x):
    return f'{x*100:.3f}%'

def from_percentage(percentage):
    percentage = percentage.rstrip('%')
    try:
        decimal = float(percentage) / 1000
        return decimal
    except ValueError:
        raise ValueError("Invalid Format")

k = 5
results_df = pd.DataFrame(columns=['Model'] + [f'Test-{i+1} Accuracy' for i in range(k)] + ['Average Accuracy'])

## Utility Functions

In [None]:
def get_outputs(model, preprocess, dataloader):
    targets = []
    predictions = []
    probabilities_list = []

    model.eval()
    with torch.no_grad():
        for batch_image, batch_target in dataloader:
            batch_image = [preprocess(image) for image in batch_image]
            batch_image = torch.stack(batch_image)
            batch_image = batch_image.to(device)
            batch_target = batch_target.to(device)

            batch_probabilities = model(batch_image)
            _, batch_prediction = torch.max(batch_probabilities, 1)

            targets.extend(batch_target.cpu().numpy())
            predictions.extend(batch_prediction.cpu().numpy())
            probabilities_list.extend(batch_probabilities.detach().cpu().numpy())

    return targets, predictions, probabilities_list

In [None]:
def parse_targets(targets):
    target_attributes = {'Label': [], 'Vegetable': [], 'Quality': []}

    for target in targets:
        label = all_labels[target]
        vegetable, quality = parse_label(label)

        target_attributes['Label'].append(label)
        target_attributes['Vegetable'].append(vegetable)
        target_attributes['Quality'].append(quality)

    return target_attributes

def parse_probabilities(probabilities):
    label_probabilities = {}
    vegetable_probabilities = {}
    quality_probabilities = {}

    for label in all_labels: label_probabilities[label] = 0
    for vegetable in all_vegetables: vegetable_probabilities[vegetable] = 0
    for quality in all_qualities: quality_probabilities[quality] = 0

    for i, value in enumerate(probabilities):
        label = all_labels[i]
        vegetable, quality = parse_label(label)

        label_probabilities[label] += value
        vegetable_probabilities[vegetable] += value
        quality_probabilities[quality] += value

    return {
        'Label': label_probabilities,
        'Vegetable': vegetable_probabilities,
        'Quality': quality_probabilities
    }

def parse_probabilities_list(probabilities_list):
    probabilities_attributes = {'Label': [], 'Vegetable': [], 'Quality': []}

    for probabilities in probabilities_list:
        parsed_probabilities = parse_probabilities(probabilities)
        probabilities_attributes['Label'].append(parsed_probabilities['Label'])
        probabilities_attributes['Vegetable'].append(parsed_probabilities['Vegetable'])
        probabilities_attributes['Quality'].append(parsed_probabilities['Quality'])

    return probabilities_attributes

def filter_label(label):
    label = label.replace(' - ', ' ')
    words = label.split()

    pair_of_words = [f'{words[i]} {words[i+1]}' if i < len(words) - 1 else words[i] for i in range(0, len(words), 2)]
    filtered_label = '\n'.join(pair_of_words)

    return filtered_label

def filter_labels_list(labels_list):
    filtered_labels_list = []
    for label in labels_list: filtered_labels_list.append(filter_label(label))

    return filtered_labels_list

def filter_probabilities_list(probabilities_list):
    filtered_probabilities_list = []

    for probabilities in probabilities_list:
        filtered_probabilities = {}

        for key, value in probabilities.items():
            filtered_key = filter_label(key)
            filtered_probabilities[filtered_key] = value

        filtered_probabilities_list.append(filtered_probabilities)

    return filtered_probabilities_list

In [None]:
def calculate_auc_roc(targets, probabilities):
    sorted_indices = np.argsort(np.array(probabilities))[::-1]
    sorted_targets = np.array(targets)[sorted_indices]

    num_positive = np.sum(sorted_targets)
    num_negative = len(sorted_targets) - num_positive

    if num_positive == 0 or num_negative == 0: return 0.0

    running_positive_count = 0
    auc_sum = 0.0

    for i in range(len(sorted_targets)):
        if sorted_targets[i] == 1: running_positive_count += 1
        else: auc_sum += running_positive_count

    return auc_sum / (num_positive * num_negative)

In [None]:
def calculate_metrics(confusion_matrix, targets, probabilities, labels):
    precision_values = []
    recall_values = []
    f1_score_values = []
    specificity_values = []
    auc_roc_values = []

    for i, label in enumerate(labels):
        tp = confusion_matrix[i, i]
        fp = np.sum(confusion_matrix[:, i]) - tp
        fn = np.sum(confusion_matrix[i, :]) - tp
        tn = np.sum(confusion_matrix) - tp - fp - fn

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        class_targets = np.where(np.array(targets) == label, 1, 0)
        class_probabilities = [probability[label] for probability in probabilities]
        auc_roc = calculate_auc_roc(class_targets, class_probabilities)

        precision_values.append(precision)
        recall_values.append(recall)
        f1_score_values.append(f1_score)
        specificity_values.append(specificity)
        auc_roc_values.append(auc_roc)

    metrics_df = pd.DataFrame(index=['Precision', 'Recall', 'F1 Score', 'Specificity', 'AUC-ROC'], columns=labels)
    metrics_df.loc['Precision'] = precision_values
    metrics_df.loc['Recall'] = recall_values
    metrics_df.loc['F1 Score'] = f1_score_values
    metrics_df.loc['Specificity'] = specificity_values
    metrics_df.loc['AUC-ROC'] = auc_roc_values

    return metrics_df

def generate_confusion_matrix(targets, predictions, probabilities, labels):
    targets = filter_labels_list(targets)
    predictions = filter_labels_list(predictions)
    labels = filter_labels_list(labels)
    probabilities = filter_probabilities_list(probabilities)

    label_indices = {label: index for index, label in enumerate(labels)}
    num_labels = len(labels)

    confusion_matrix = np.zeros((num_labels, num_labels), dtype=np.int32)

    for target, prediction in zip(targets, predictions):
        target_index = label_indices[target]
        prediction_index = label_indices[prediction]
        confusion_matrix[target_index, prediction_index] += 1

    confusion_df = pd.DataFrame(confusion_matrix, index=labels, columns=labels)
    correct_count = np.diag(confusion_matrix).sum()
    metrics_df = calculate_metrics(confusion_matrix, targets, probabilities ,labels).astype(float).round(3)

    return confusion_df, metrics_df, correct_count

def display_matrices(model_name, test_index, confusion_matrices, metrics_tables, correct_counts, total_count):
    matrices = [
        confusion_matrices['Label'], metrics_tables['Label'],
        confusion_matrices['Vegetable'], metrics_tables['Vegetable'],
        confusion_matrices['Quality'], metrics_tables['Quality']
    ]

    plt.figure(figsize=(25, 50))
    colspans = [2, 2, 1, 1, 1, 1]
    rowspans = [2, 1, 1, 1, 1, 1]
    plot_arrangements = [(0, 0),
        (rowspans[0], 0),
        (rowspans[0]+rowspans[1], 0),
        (rowspans[0]+rowspans[1]+rowspans[2], 0),
        (rowspans[0]+rowspans[1], 1),
        (rowspans[0]+rowspans[1]+rowspans[2], 1)
    ]
    contexts = ['Label', 'Vegetable', 'Quality']

    for i, plot_arrangement in enumerate(plot_arrangements):
        plt.subplot2grid((5, 2), plot_arrangement, colspan=colspans[i], rowspan=rowspans[i])
        heatmap = sns.heatmap(matrices[i], annot=True, fmt='g', cmap='Blues', cbar=False, annot_kws={"fontsize": 16})
        heatmap.set_xticklabels(heatmap.get_xticklabels(), fontsize=16, rotation=45)
        heatmap.set_yticklabels(heatmap.get_yticklabels(), fontsize=16, rotation=0)

        if i % 2 == 0:
            plt.xlabel('Predicted', fontsize=16)
            plt.ylabel('Actual', fontsize=16)
            plt.title(f'Confusion Matrix ({contexts[i//2]}), Accuracy: {correct_counts[contexts[i//2]]}/{total_count} = {correct_counts[contexts[i//2]] / total_count:.2%}', fontsize=18)
        else:
            plt.title(f'Metrics ({contexts[i//2]})', fontsize=18)

    plt.suptitle(f'Test-{test_index + 1} Results: {model_name}', fontsize=24)
    plt.tight_layout(pad=4.0)
    plt.show()

In [None]:
def evaluate(test_index, model_name, model, preprocess, dataloader):
    since = time.time()

    print(f'Evaluating {model_name} on Test {test_index+1}')
    print('-' * 20)
    print()

    targets, predictions, probabilities_list = get_outputs(model, preprocess, dataloader)
    total_count = len(targets)

    target_attributes = parse_targets(targets)
    predicted_attributes = parse_targets(predictions)
    probability_attributes = parse_probabilities_list(probabilities_list)

    all_values = {
        'Label': all_labels,
        'Vegetable': all_vegetables,
        'Quality': all_qualities
    }

    contexts = ['Label', 'Vegetable', 'Quality']
    confusion_matrices = {}
    metrics_tables = {}
    correct_counts = {}

    for context in contexts:
        print(f'Matching {context}')
        print('-' * 10)

        target_values = target_attributes[context]
        predicted_values = predicted_attributes[context]
        probability_values = probability_attributes[context]

        confusion_matrices[context], metrics_tables[context], correct_counts[context] = generate_confusion_matrix(
            target_values,
            predicted_values,
            probability_values,
            all_values[context]
        )

    print('Generating Confusion Matrices')
    print('-' * 10)
    display_matrices(model_name, test_index, confusion_matrices, metrics_tables, correct_counts, total_count)

    time_elapsed = time.time() - since
    print(f'Evaluation complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print()

    return correct_counts['Label']/total_count

## Run Evaluation

In [None]:
for model_set in completed_model_sets:
    model_name = model_set['name']

    row = {'Model': model_name}
    for test_index, model in enumerate(model_set['models']):
        acc = evaluate(test_index, model_name, model, model_set['preprocess'], dataloaders[test_index]['Test'])
        row[f'Test-{test_index + 1} Accuracy'] = acc

    results_df = pd.concat([results_df, pd.DataFrame(row, index=[0])], ignore_index=True)

# View Results

## Calculate Average Accuracy

In [None]:
weighted_average_accuracies = []
for index, row in results_df.iterrows():
    weights = [len(dataloader['Test']) for dataloader in dataloaders]
    average_accuracy = row.iloc[1:-1].dot(weights) / sum(weights)
    weighted_average_accuracies.append(average_accuracy)

results_df['Average Accuracy'] = weighted_average_accuracies

## Display Results

In [None]:
results_df['Average Accuracy'] = results_df['Average Accuracy'].map(to_percentage)
for i in range(k):
    results_df[f'Test-{i+1} Accuracy'] = results_df[f'Test-{i+1} Accuracy'].map(to_percentage)

display(results_df)

results_df['Average Accuracy'] = results_df['Average Accuracy'].map(from_percentage)
for i in range(k):
    results_df[f'Test-{i+1} Accuracy'] = results_df[f'Test-{i+1} Accuracy'].map(from_percentage)

# Best Model Analysis (Simple)

## Select Best Model

In [None]:
best_model_row = results_df[results_df['Average Accuracy'] == results_df['Average Accuracy'].max()]
best_model_name = best_model_row['Model'].values[0]
best_model_index = completed_model_sets.index(next(model_set for model_set in completed_model_sets if model_set['name'] == best_model_name))
best_model_set = completed_model_sets[best_model_index]

## Utility Functions

In [None]:
def get_misclassifications(model, preprocess, dataloader):
    vegetable_mismatch_count = 0
    quality_mismatch_count = 0

    misclassifications = []
    seen_pairs = []

    model.eval()
    with torch.no_grad():
        for batch_image, batch_target in dataloader:
            batch_image = [preprocess(image) for image in batch_image]
            batch_image = torch.stack(batch_image)
            batch_image = batch_image.to(device)
            batch_target = batch_target.to(device)

            batch_probabilities = model(batch_image)
            _, batch_prediction = torch.max(batch_probabilities, 1)

            misclassified_indices = (batch_prediction != batch_target).nonzero().view(-1)

            for index in misclassified_indices:
                input_image = batch_image[index].cpu()
                target_attribute = parse_targets([batch_target[index].item()])
                predicted_attribute = parse_targets([batch_prediction[index].item()])

                label_pair = target_attribute['Label'], predicted_attribute['Label']
                if label_pair in seen_pairs: continue
                seen_pairs.append(label_pair)

                if (target_attribute['Vegetable'] != predicted_attribute['Vegetable']) and \
                   (target_attribute['Quality'] != predicted_attribute['Quality']): pass

                elif target_attribute['Vegetable'] != predicted_attribute['Vegetable']:
                    if vegetable_mismatch_count >= 5: continue

                elif target_attribute['Quality'] != predicted_attribute['Quality']:
                    if quality_mismatch_count >= 5: continue

                if target_attribute['Vegetable'] != predicted_attribute['Vegetable']: vegetable_mismatch_count += 1
                if target_attribute['Quality'] != predicted_attribute['Quality']: quality_mismatch_count += 1

                misclassifications.append({
                    'image': input_image,
                    'target': target_attribute,
                    'prediction': predicted_attribute
                })

                if len(misclassifications) == 10: return misclassifications

    return misclassifications

In [None]:
def display_misclassifications(model_name, test_index, misclassifications):
    num_misclassifications = len(misclassifications)
    num_cols = 5
    num_rows = math.ceil(num_misclassifications / num_cols)
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 4*num_rows))

    for i, misclassification in enumerate(misclassifications):
        row = i // num_cols
        col = i % num_cols

        input_image = misclassification['image'].numpy().transpose(1, 2, 0)
        input_image = (input_image - input_image.min()) / (input_image.max() - input_image.min())
        target_label = misclassification['target']['Label']
        predicted_label = misclassification['prediction']['Label']

        ax = axes[row, col] if num_rows > 1 else axes[col]
        ax.imshow(input_image)
        ax.set_title(f'Target: {target_label}\nPredicted: {predicted_label}',
                     fontsize=9, pad=9)
        ax.axis('off')

    for i in range(len(misclassifications), num_rows * num_cols):
        row = i // num_cols
        col = i % num_cols

        ax = axes[row, col] if num_rows > 1 else axes[col]
        ax.axis('off')

    plt.suptitle(f'Test-{test_index + 1}: {model_name}', fontsize=24)
    plt.tight_layout(h_pad=2)
    plt.show()

## Run Analysis

In [None]:
model_set = best_model_set
model_name = model_set['name']

for test_index, model in enumerate(model_set['models']):
    misclassifications = get_misclassifications(model, model_set['preprocess'], dataloaders[test_index]['Test'])
    display_misclassifications(model_name, test_index, misclassifications)

# Best Model Analysis (Complex)

## Select Best Model

In [None]:
best_model_row = results_df[results_df['Average Accuracy'] == results_df['Average Accuracy'].max()]
best_model_name = best_model_row['Model'].values[0]
best_model_index = completed_model_sets.index(next(model_set for model_set in completed_model_sets if model_set['name'] == best_model_name))
best_model_set = completed_model_sets[best_model_index]

## Define GradCAM Class

In [None]:
from skimage import io
from torchvision import transforms
from torch.autograd import Function
import cv2

class GradCam:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradient = None
        self.activations = None

        self.hook_handles = []
        for module in self.model.named_modules():
            if module[0] == self.target_layer:
                self.hook_handles.append(module[1].register_backward_hook(self.save_gradient))
            elif 'relu' in module[0]:
                self.hook_handles.append(module[1].register_forward_hook(self.save_activation))

    def save_gradient(self, module, grad_input, grad_output):
        self.gradient = grad_output[0]

    def save_activation(self, module, input, output):
        self.activations = output

    def __call__(self, x):
        x.requires_grad_()
        self.model.zero_grad()
        output = self.model(x)
        output.backward(gradient=torch.ones_like(output))
        grad_values = self.gradient.mean(dim=(2, 3), keepdim=True)
        cam = torch.sum(self.activations * grad_values, dim=1, keepdim=True)
        cam = F.relu(cam)
        cam = F.interpolate(cam, size=(x.size(2), x.size(3)), mode="bilinear", align_corners=False)
        cam = cam.squeeze()
        cam = cam.detach().cpu().numpy()
        cam = np.maximum(cam, 0)
        cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam))
        return cam

## Utility Functions

In [None]:
def get_misclassifications(model, preprocess, dataloader):
    vegetable_mismatch_count = 0
    quality_mismatch_count = 0

    misclassifications = []
    seen_pairs = []

    model.eval()
    with torch.no_grad():
        for batch_image, batch_target in dataloader:
            batch_image = [preprocess(image) for image in batch_image]
            batch_image = torch.stack(batch_image)
            batch_image = batch_image.to(device)
            batch_target = batch_target.to(device)

            batch_probabilities = model(batch_image)
            _, batch_prediction = torch.max(batch_probabilities, 1)

            misclassified_indices = (batch_prediction != batch_target).nonzero().view(-1)

            for index in misclassified_indices:
                input_image = batch_image[index].cpu()
                target_attribute = parse_targets([batch_target[index].item()])
                predicted_attribute = parse_targets([batch_prediction[index].item()])

                label_pair = target_attribute['Label'], predicted_attribute['Label']
                if label_pair in seen_pairs: continue
                seen_pairs.append(label_pair)

                if (target_attribute['Vegetable'] != predicted_attribute['Vegetable']) and \
                   (target_attribute['Quality'] != predicted_attribute['Quality']): pass

                elif target_attribute['Vegetable'] != predicted_attribute['Vegetable']:
                    if vegetable_mismatch_count >= 5: continue

                elif target_attribute['Quality'] != predicted_attribute['Quality']:
                    if quality_mismatch_count >= 5: continue

                if target_attribute['Vegetable'] != predicted_attribute['Vegetable']: vegetable_mismatch_count += 1
                if target_attribute['Quality'] != predicted_attribute['Quality']: quality_mismatch_count += 1

                misclassifications.append({
                    'image': input_image,
                    'target': target_attribute,
                    'prediction': predicted_attribute
                })

                if len(misclassifications) == 10: return misclassifications

    return misclassifications

In [None]:
def display_misclassifications(model, model_name, test_index, misclassifications):
    num_misclassifications = len(misclassifications)
    fig, axes = plt.subplots(num_misclassifications, 3, figsize=(15, 4*num_misclassifications))

    target_layer = model.features.denseblock4.denselayer16.conv2
    grad_cam = GradCam(model, target_layer)

    for i, misclassification in enumerate(misclassifications):
        input_image = misclassification['image'].numpy().transpose(1, 2, 0)
        input_image = (input_image - input_image.min()) / (input_image.max() - input_image.min())
        target_label = misclassification['target']['Label']
        predicted_label = misclassification['prediction']['Label']

        ax1 = axes[i, 0]
        ax1.imshow(input_image)
        ax1.set_title(f'Target: {target_label}\nPredicted: {predicted_label}',
                      fontsize=9, pad=9)
        ax1.axis('off')

        input_tensor = torch.tensor(input_image.transpose(2, 0, 1)).unsqueeze(0).to(device)
        cam = grad_cam(input_tensor)
        cam = cv2.resize(cam, (input_image.shape[1], input_image.shape[0]))
        heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)

        ax2 = axes[i, 1]
        ax2.imshow(heatmap)
        ax2.set_title('Grad-CAM Heatmap', fontsize=9, pad=9)
        ax2.axis('off')

        blended_image = cv2.addWeighted(input_image, 0.7, heatmap, 0.3, 0)

        ax3 = axes[i, 2]
        ax3.imshow(blended_image)
        ax3.set_title('Original + Grad-CAM', fontsize=9, pad=9)
        ax3.axis('off')

    plt.suptitle(f'Test-{test_index + 1}: {model_name}', fontsize=24)
    plt.tight_layout(h_pad=2)
    plt.show()

## Run Analysis

In [None]:
model_set = best_model_set
model_name = model_set['name']

for test_index, model in enumerate(model_set['models']):
    misclassifications = get_misclassifications(model, model_set['preprocess'], dataloaders[test_index]['Test'])
    display_misclassifications(model, model_name, test_index, misclassifications)