# CS 554 - Introduction to Machine Learning and Artificial Neural Networks
### Adaptive Cost-Sensitive Trade-off Analysis for Deep Neural Networks
**Authors:** A. E., Bolluk S., T. E.

## Import Libraries

In [None]:
import os
import json 
import shutil
import datetime
from tqdm import tqdm

import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F 
from torchvision import transforms, datasets, models
from torchvision.transforms import ToTensor
from torchvision.transforms import ToTensor, Normalize, Compose

from prefetch_generator import BackgroundGenerator

import numpy as np
import matplotlib
import matplotlib.pyplot as plt 
from sklearn.metrics import confusion_matrix

import warnings
warnings.filterwarnings("ignore")

torch.backends.cudnn.version()

torch.backends.cudnn.benchmark = True

np.random.seed(42)
 
torch.manual_seed(42)

use_cuda = torch.cuda.is_available()

if use_cuda:
    torch.cuda.manual_seed(42)
    print(f"Using cuda: {torch.cuda.get_device_name()} with capability {torch.cuda.get_device_capability()}")


## Experiment Config

In [None]:
import enum
class ExperimentModel(enum.Enum):
   MLP      = 1
   VGG      = 2
   RESNET   = 3

class ExperimentDataset(enum.Enum):
   CIFAR10        = 1
   FMNIST         = 2
   INTELIMAGE     = 3
   FMNIST_IMB70   = 4
   FMNIST_IMB90   = 5 
   CIFAR10_IMB90  = 7 
   INTELIMAGE_IMB90  = 8

In [None]:
experiment_model = ExperimentModel.RESNET
experiment_dataset = ExperimentDataset.INTELIMAGE_IMB90

## Experiment Setup

In [None]:
current_datetime = datetime.datetime.now()
timestamp = current_datetime.strftime("%y%m%d-%H%M%S")

if experiment_model == ExperimentModel.MLP:
    experiment_name = f"{timestamp}_mlp"
elif experiment_model == ExperimentModel.VGG:
    experiment_name = f"{timestamp}_vgg16"
elif experiment_model == ExperimentModel.RESNET:
    experiment_name = f"{timestamp}_resnet18"
else:
    raise Exception('Unknown experimentation type')

experiment_folder_path = os.path.join(f"{os.getcwd()}", "experiment")
if not os.path.exists(experiment_folder_path):
    #Create a new directory if not already exists
    os.makedirs(experiment_folder_path)

# Create a folder to store the results of all the experiments
experiment_path = os.path.join(experiment_folder_path, experiment_name)

if experiment_dataset == ExperimentDataset.CIFAR10 or experiment_dataset == ExperimentDataset.CIFAR10_IMB90:
    class_json = "classes_cifar.json"
elif experiment_dataset == ExperimentDataset.FMNIST or experiment_dataset == ExperimentDataset.FMNIST_IMB70 or experiment_dataset == ExperimentDataset.FMNIST_IMB90:
    class_json = "classes_fmnist.json"


elif experiment_dataset == ExperimentDataset.INTELIMAGE or experiment_dataset == ExperimentDataset.INTELIMAGE_IMB90:
    class_json = "classes_intelimage.json"
else:
    raise Exception('Unknown dataset type')

# Set Data Root
data_root = os.path.join(f"{os.getcwd()}", "data") 

# Import Classes
classes_path = os.path.join(data_root, class_json)
with open(classes_path, "r") as json_file:
    classes = json.load(json_file)

print(classes)

https://github.com/zalandoresearch/fashion-mnist

https://www.kaggle.com/datasets/zalando-research/fashionmnist

## Utils

In [None]:
def count_parameters(model: nn.Module, only_trainable_parameters: bool = False):
    """ Count the number of parameters in a model
    Args:
        model (nn.Module)
        only_trainable_parameters (bool:False): only count the trainable parameters
    Returns:
        num_parameters (int): number of parameters in the model
    """
    if only_trainable_parameters:
        num_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad)
    else:
        num_parameters = sum(p.numel() for p in model.parameters())
    return num_parameters

In [None]:
class MetricTracker:
    """Computes and stores the average and current value of a metric."""

    def __init__(self):
        self.reset()

    def reset(self):
        """ Reset all the tracked parameters """
        self.value = 0
        self.average = 0
        self.sum = 0
        self.count = 0

    def update(self, value: float, num: int = 1):
        """ Update the tracked parameters
        Args:
            value (float): new value to update the tracker with
            num (int: 1): number of elements used to compute the value
        """
        self.value = value
        self.sum += value
        self.count += num
        self.average = self.sum / self.count

In [None]:
class ConfusionMatrix:
  """Store, update and plot a confusion matrix."""

  def __init__(self, classes: dict):
      """ Create and initialize a confusion matrix
      Args:
          classes (dict): dictionary containing all the classes (e.g. {"0": "label_0", "1": "label_1",...})
      """
      self.classes = classes
      self.num_classes = len(self.classes)
      self.labels_classes = range(self.num_classes)
      self.list_classes = list(self.classes.values())

      self.cm = np.zeros([len(classes), len(classes)], dtype=int)

  def update_confusion_matrix(self, targets: torch.Tensor, predictions: torch.Tensor):
      """ Update the confusion matrix
      Args:
          targets (torch.Tensor): tensor on the cpu containing the target classes
          predictions(torch.Tensor): tensor on the cpu containing the predicted classes
      """
      # use sklearn to update the confusion matrix
      self.cm += confusion_matrix(targets, predictions, labels=self.labels_classes)

  def plot_confusion_matrix(
      self,
      normalize: bool = True,
      title: str = None,
      cmap: matplotlib.colors.Colormap = plt.cm.Blues,
  ) -> matplotlib.figure.Figure:
      """
      This function plots the confusion matrix.
      Args:
          normalize (bool: True): boolean to control the normalization of the confusion matrix.
          title (str: ""): title for the figure
          cmap (matplotlib.colors.Colormap: plt.cm.Blues): color map, defaults to 'Blues'
      Returns:
          matplotlib.figure.Figure: the ready-to-show/save figure
      """
      if not title:
          title = f"Normalized Confusion Matrix" if normalize else f"Confusion Matrix"

       if normalize:
          self.cm = self.cm.astype("float") / np.maximum(
              self.cm.sum(axis=1, keepdims=True), 1
          )

      # Create figure with size determined by number of classes.
      fig, ax = plt.subplots(
          figsize=[0.4 * self.num_classes + 4, 0.4 * self.num_classes + 2]
      )
      im = ax.imshow(self.cm, interpolation="nearest", cmap=cmap)
      ax.figure.colorbar(im, ax=ax)

      # Show all ticks and label them with the respective list entries.
      # Add a tick at the start and end in order to not cut off the figure.
      ax.set(
          xticklabels=[""] + self.list_classes,
          yticklabels=[""] + self.list_classes,
          xticks=np.arange(-1, self.cm.shape[1] + 1),
          yticks=np.arange(-1, self.cm.shape[0] + 1),
          
          title=title,
          ylabel="True label",
          xlabel="Predicted label",
      )

      # Rotate the tick labels and set their alignment.
      plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

      # Loop over data dimensions and create text annotations.
      fmt = ".2f" if normalize else "d"
      thresh = self.cm.max() / 2.0
      for i in range(self.cm.shape[0]):
          for j in range(self.cm.shape[1]):
              ax.text(
                  j,
                  i,
                  format(self.cm[i, j], fmt),
                  ha="center",
                  va="center",
                  color="white" if self.cm[i, j] > thresh else "black",
              )
      fig.tight_layout()

      return fig

## Data Preprocessing

In [None]:
if experiment_dataset == ExperimentDataset.FMNIST:
    # Generate transformations for train  and set
    train_transform_operations = [transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    test_transform_operations = [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]

    # Data Transformations
    random_crop     = True
    random_erasing  = True
    convert_to_RGB  = True

    if random_crop:
        train_transform_operations.insert(0, transforms.RandomCrop(28, padding=4))
    if random_erasing:
        train_transform_operations.append(transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value="random", inplace=False))
    if convert_to_RGB:
        to_rgb = transforms.Lambda(lambda x: x.repeat(3, 1, 1))
        train_transform_operations.append(to_rgb)
        test_transform_operations.append(to_rgb)

    # Data Loaders
    percentage_validation_set = 10
    batch_size = 256

    # Train anc Val Set
    train_transform = transforms.Compose(train_transform_operations)

    vis_train_dataset = datasets.FashionMNIST(
        root=data_root, train=True, transform=ToTensor(), download=True
    )


    train_dataset = datasets.FashionMNIST(
        root=data_root, train=True, transform=train_transform, download=True
    )

    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )

    # Test Set
    test_transform = transforms.Compose(test_transform_operations)

    test_set = datasets.FashionMNIST(
        root=data_root, train=False, transform=test_transform, download=True
    )

    test_loader = DataLoader(
        test_set, batch_size=batch_size, shuffle=False
    )

elif experiment_dataset == ExperimentDataset.CIFAR10:

    transform = Compose([ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    train_dataset = datasets.CIFAR10(root='./data', download=True, train=True, transform=transform)

    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)

    percentage_validation_set = 10
    batch_size = 4
    batch_size = 256

    ## ---------------------------- ##
    ## SPLIT TRAIN / VAL ---------- ##
    ## ---------------------------- ##
    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )

    ## ---------------------------- ##
    ## SPLIT TEST ----------------- ##
    ## ---------------------------- ##
    test_transform_operations = [
        transforms.ToTensor(), 
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
    ]

    test_transform = Compose(test_transform_operations)
    test_set = datasets.CIFAR10(root='./data', train=False, transform=test_transform, download=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

elif experiment_dataset == ExperimentDataset.INTELIMAGE:

    percentage_validation_set = 10
    batch_size = 4

    data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    }

    data_dir = './data/intelimage/intelimage/IntelImageClassification'

    ## ---------------------------- ##
    ## SPLIT TRAIN / VAL ---------- ##
    ## ---------------------------- ##
    image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']}
    train_dataset = image_datasets['train']

    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    ## ---------------------------- ##
    ## SPLIT TEST ----------------- ##
    ## ---------------------------- ##
    test_set = image_datasets['test']

    test_loader = DataLoader(
        test_set, batch_size=batch_size, shuffle=False
    )
    


In [None]:
# Define the class for neural network model with He Initialization
# Best with: ReLU
class Net_He(nn.Module):
    
    # Constructor
    def __init__(self, Layers):
        super(Net_He, self).__init__()
        self.hidden = nn.ModuleList()

        for input_size, output_size in zip(Layers, Layers[1:]):
            linear = nn.Linear(input_size, output_size)
            torch.nn.init.kaiming_uniform_(linear.weight, nonlinearity='relu')
            self.hidden.append(linear)

    # Prediction
    def forward(self, x):
        L = len(self.hidden)
        for (l, linear_transform) in zip(range(L), self.hidden):
            if l < L - 1:
                x = F.relu(linear_transform(x))
            else:
                x = nn.Softmax(dim=1)(x)
        return x

if experiment_model == ExperimentModel.MLP:
    # Set the size of the neural network
    input_size = 3*32*32
    output_size = 10
    #layers = [input_size, 4000, 1000, 4000, output_size]
    layers = [input_size, 1024, output_size]

    # Determine the learning rate
    learning_rate = 0.001
    
    # Initialize the model
    model = Net_He(layers)

    # Determine the optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Determine the loss function
    criterion = nn.CrossEntropyLoss()

    num_trainable_parameters = count_parameters(model, only_trainable_parameters=True)

    # Load the model on the GPU if available
    if use_cuda:
        model = model.cuda()

    print(f"Loaded MLP with {num_trainable_parameters} trainable parameters (GPU: {use_cuda})")

elif experiment_model == ExperimentModel.RESNET:
    num_epochs = 30
    learning_rate = 0.001
    momentum = 0.9
    weight_decay = 0
    nesterov = True
    
    model = models.resnet18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, len(classes))

    num_trainable_parameters = count_parameters(model, only_trainable_parameters=True)

    # Load the model on the GPU if available
    if use_cuda:
        model = model.cuda()

    criterion = nn.CrossEntropyLoss()

    optimizer = optim.SGD(
        model.parameters(),
        lr=learning_rate,
        momentum=momentum,
        weight_decay=weight_decay,
        nesterov=nesterov
    )

    print(f"Loaded ResNet18 with {num_trainable_parameters} trainable parameters (GPU: {use_cuda})")

elif experiment_model == ExperimentModel.VGG:
    num_epochs = 30
    #prereq
    class_names = ['buildings', 'forest', 'glacier', 'mountain', 'sea', 'street'] 

    # Load the pretrained model from pytorch
    model = models.vgg16_bn()
    model.load_state_dict(torch.load("./data/intelimage/vgg16_bn.pth"))
    print(model.classifier[6].out_features) # 1000 


    # Freeze training for all layers
    for param in model.features.parameters():
        param.require_grad = False

    # Newly created modules have require_grad=True by default
    num_features = model.classifier[6].in_features
    features = list(model.classifier.children())[:-1] # Remove last layer
    features.extend([nn.Linear(num_features, len(class_names))]) # Add our layer with 6 outputs
    model.classifier = nn.Sequential(*features) # Replace the model classifier

    # Load the model on the GPU if available
    if use_cuda:
        model = model.cuda()

    #vgg16 = vgg16.to(device)

    criterion = nn.CrossEntropyLoss()

    # Observe that only parameters of final layer are being optimized as
    # opposed to before.
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    num_trainable_parameters = count_parameters(model, only_trainable_parameters=True)
    print(f"Loaded VGG with {num_trainable_parameters} trainable parameters (GPU: {use_cuda})")




## Imbalance

#### FMNIST_IMB

In [None]:
if experiment_dataset == ExperimentDataset.FMNIST_IMB70:
    # Generate transformations for train  and set
    train_transform_operations = [transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    test_transform_operations = [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]

    # Data Transformations
    random_crop     = True
    random_erasing  = True
    convert_to_RGB  = True

    if random_crop:
        train_transform_operations.insert(0, transforms.RandomCrop(28, padding=4))
    if random_erasing:
        train_transform_operations.append(transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value="random", inplace=False))
    if convert_to_RGB:
        to_rgb = transforms.Lambda(lambda x: x.repeat(3, 1, 1))
        train_transform_operations.append(to_rgb)
        test_transform_operations.append(to_rgb)

    # Data Loaders
    percentage_validation_set = 10
    batch_size = 256

    # Train anc Val Set
    train_transform = transforms.Compose(train_transform_operations)

    train_dataset = datasets.FashionMNIST(
        root=data_root, train=True, transform=train_transform, download=True
    )

    # Test Set
    test_transform = transforms.Compose(test_transform_operations)

    test_set = datasets.FashionMNIST(
        root=data_root, train=False, transform=test_transform, download=True
    )

    test_loader = DataLoader(
        test_set, batch_size=batch_size, shuffle=False
    )

    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)

    # Create artificial imbalanced class counts
    imbal_class_counts = [1800, 6000] * 5

    class_indices = [np.where(targets == i)[0] for i in range(nb_classes)]

    imbal_class_indices = [class_idx[:class_count] for class_idx, class_count in zip(class_indices, imbal_class_counts)]
    imbal_class_indices = np.hstack(imbal_class_indices)


    train_dataset.targets = targets[imbal_class_indices]
    train_dataset.data = train_dataset.data[imbal_class_indices]

    
    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)


    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )

if experiment_dataset == ExperimentDataset.FMNIST_IMB90:
    # Generate transformations for train  and set
    train_transform_operations = [transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    test_transform_operations = [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]

    # Data Transformations
    random_crop     = True
    random_erasing  = True
    convert_to_RGB  = True

    if random_crop:
        train_transform_operations.insert(0, transforms.RandomCrop(28, padding=4))
    if random_erasing:
        train_transform_operations.append(transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value="random", inplace=False))
    if convert_to_RGB:
        to_rgb = transforms.Lambda(lambda x: x.repeat(3, 1, 1))
        train_transform_operations.append(to_rgb)
        test_transform_operations.append(to_rgb)

    # Data Loaders
    percentage_validation_set = 10
    batch_size = 256

    # Train anc Val Set
    train_transform = transforms.Compose(train_transform_operations)
   
    train_dataset = datasets.FashionMNIST(
        root=data_root, train=True, transform=train_transform, download=True
    )

    # Test Set
    test_transform = transforms.Compose(test_transform_operations)

    test_set = datasets.FashionMNIST(
        root=data_root, train=False, transform=test_transform, download=True
    )

    test_loader = DataLoader(
        test_set, batch_size=batch_size, shuffle=False
    )

    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)

    # Create artificial imbalanced class counts
    imbal_class_counts = [600, 6000] * 5

    class_indices = [np.where(targets == i)[0] for i in range(nb_classes)]

    imbal_class_indices = [class_idx[:class_count] for class_idx, class_count in zip(class_indices, imbal_class_counts)]
    imbal_class_indices = np.hstack(imbal_class_indices)


    train_dataset.targets = targets[imbal_class_indices]
    train_dataset.data = train_dataset.data[imbal_class_indices]

    
    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)


    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )

#### INTEL_IMB

In [None]:
if experiment_dataset == ExperimentDataset.INTELIMAGE_IMB90:

    percentage_validation_set = 10
    batch_size = 4

    data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    }

    data_dir = './data/intelimage/intelimage/IntelImageClassification'

    ## ---------------------------- ##
    ## SPLIT TRAIN / VAL ---------- ##
    ## ---------------------------- ##
    image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']}
    train_dataset = image_datasets['train']

    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)
    
    # Create artificial imbalanced class counts
    imbal_class_counts = [200, 2000] * 3

    class_indices = [np.where(targets == i)[0] for i in range(nb_classes)]

    imbal_class_indices = [class_idx[:class_count] for class_idx, class_count in zip(class_indices, imbal_class_counts)]
    imbal_class_indices = np.hstack(imbal_class_indices)


    train_dataset.targets = targets[imbal_class_indices]



   # train_dataset = train_dataset[imbal_class_indices]

    
    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)


    train_set_length = int(len(train_dataset) * (100 - percentage_validation_set) / 100)
    val_set_length = int(len(train_dataset) - train_set_length)

    # Randomly split a dataset into non-overlapping new datasets of given lengths. Fix the generator for reproducible results
    train_set, val_set = torch.utils.data.random_split(train_dataset, (train_set_length, val_set_length), generator=torch.Generator().manual_seed(42))

    # set shuffle=True to have the data reshuffled at every epoch 
    train_loader = DataLoader(
        train_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
    val_loader = DataLoader(
        val_set, batch_size=batch_size, shuffle=True, generator=torch.Generator().manual_seed(42)
    )
 
    ## ---------------------------- ##
    ## SPLIT TEST ----------------- ##
    ## ---------------------------- ##
    test_set = image_datasets['test']

    test_loader = DataLoader(
        test_set, batch_size=batch_size, shuffle=False
    )  

In [None]:
imbalanced_enabled = False
if imbalanced_enabled:

    from torchvision.transforms import ToTensor, Normalize, Compose

    transform = Compose([ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    train_dataset = datasets.CIFAR10( root='./data', download=True, train=True, transform=transform)

    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    cl, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(cl)
    print(class_counts)

    # Create artificial imbalanced class counts
    imbal_class_counts = [500, 5000] * 5

    class_indices = [np.where(targets == i)[0] for i in range(nb_classes)]

    imbal_class_indices = [class_idx[:class_count] for class_idx, class_count in zip(class_indices, imbal_class_counts)]
    imbal_class_indices = np.hstack(imbal_class_indices)


    train_dataset.targets = targets[imbal_class_indices]
    train_dataset.data = train_dataset.data[imbal_class_indices]

    
    # Get all training targets and count the number of class instances
    targets = np.array(train_dataset.targets)
    classes, class_counts = np.unique(targets, return_counts=True)
    nb_classes = len(classes)
    print(class_counts)

### Data Vis

In [None]:
datavis_enabled = False
if datavis_enabled:

    labels_map = {
        0: "T-Shirt",
        1: "Trouser",
        2: "Pullover",
        3: "Dress",
        4: "Coat",
        5: "Sandal",
        6: "Shirt",
        7: "Sneaker",
        8: "Bag",
        9: "Ankle Boot",
    }
    figure = plt.figure(figsize=(8, 8))
    cols, rows = 3, 3
    for i in range(1, cols * rows + 1):
        sample_idx = torch.randint(len(vis_train_dataset), size=(1,)).item()
        img, label = vis_train_dataset[sample_idx]
        figure.add_subplot(rows, cols, i)
        plt.title(labels_map[label])
        plt.axis("off")
        plt.imshow(img.squeeze(), cmap=plt.get_cmap('gray'))
    plt.show()

    # example of loading the fashion mnist dataset
 
    # plot first few images
    for i in range(9):
        # define subplot
        plt.subplot(330 + 1 + i)
        # plot raw pixel data
        plt.imshow(train_set.dataset.data[i], cmap=plt.get_cmap('gray'))
    # show the figure
    plt.show()

    from collections import Counter
    train_classes = [label for img, label in train_dataset]
    
    total_count = Counter(train_classes)
    class_ratio = {}

    for k, v in total_count.items():
        class_ratio[k] = v * 100 / train_dataset.data.size()[0]
    

    class_ratio

## Train Utils

In [None]:
def train(
    model: nn.Module,
    classes: dict,
    data_loader: torch.utils.data.DataLoader,
    criterion: nn.Module,
    optimizer: nn.Module,
    epoch: int,
    num_iteration: int,
    use_cuda: bool,
    tensorboard_writer: torch.utils.tensorboard.SummaryWriter):
  """ Train a given model
  Args:
      model (nn.Module): model to train.
      classes (dict): dictionnary containing the classes and their indice.
      data_loader (torch.utils.data.DataLoader): data loader with the data to train the model on.
      criterion (nn.Module): loss function.
      optimizer (nn.Module): optimizer function.
      epoch (int): epoch of training.
      num_iteration (int): number of iterations since the beginning of the training.
      use_cuda (bool): boolean to decide if cuda should be used.
      tensorboard_writer (torch.utils.tensorboard.SummaryWriter): writer to write the metrics in tensorboard.
  Returns:
      num_iteration (int): number of iterations since the beginning of the training (increased during the training).
      loss (float): final loss
      accuracy_top1 (float): final accuracy top1
      accuracy_top5 (float): final accuracy top5
      confidence_mean (float): mean confidence
  """
  # Switch the model to train mode
  model.train()

  # Initialize the trackers for the loss and the accuracy
  loss_tracker = MetricTracker()
  accuracy_top1_tracker = MetricTracker()
  accuracy_top5_tracker = MetricTracker()
  confidence_tracker = MetricTracker()

  # Initialize confusing matrix
  confusion_matrix_tracker = ConfusionMatrix(classes)

  # create BackgroundGenerator and wrap it in tqdm progress bar
  progress_bar = tqdm(
      BackgroundGenerator(data_loader, max_prefetch=32), total=len(data_loader)
  )

  for i, data in enumerate(progress_bar):
      inputs, targets = data

      # Save the inputs to the disk
      # img_grid = torchvision.utils.make_grid(inputs)
      # torchvision.utils.save_image(img_grid,"inputs.jpg")

      if use_cuda:
          inputs = inputs.cuda()
          targets = targets.cuda()
 
      # Forward pass
      if experiment_model == ExperimentModel.MLP:
        outputs = model(inputs.view(-1, 3*32*32))
      else:
        outputs = model(inputs)

      #
      
      loss = criterion(outputs, targets)
      confidence, prediction = outputs.topk(dim=1, k=5)

      # Backward pass and optimizer step
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # Track loss, accuracy and confidence
      loss_tracker.update(loss.item())
      accuracy_top1_tracker.update(
          (prediction[:, 0] == targets).sum().item(), targets.numel()
      )
      accuracy_top5_tracker.update(
          (prediction[:, :5] == targets[:, None]).sum().item(), targets.numel()
      )
      confidence_tracker.update(confidence[:, 0].sum().item(), targets.numel())

      # Update the confusion matrix
      confusion_matrix_tracker.update_confusion_matrix(targets.cpu(), prediction[:, 0].cpu())

      # Add the new values to the tensorboard summary writer
      tensorboard_writer.add_scalar("loss", loss_tracker.average, num_iteration)
      tensorboard_writer.add_scalar(
          "accuracy_top1", accuracy_top1_tracker.average, num_iteration
      )
      tensorboard_writer.add_scalar(
          "accuracy_top5", accuracy_top5_tracker.average, num_iteration
      )
      # tensorboard_writer.add_scalar(
      #     "confidence_mean", confidence_tracker.average, num_iteration
      # )

      # Update the progress_bar information
      progress_bar.set_description(f"Epoch {epoch + 1}/{num_epochs} Train")
      progress_bar.set_postfix(
          loss=f"{loss_tracker.average:05.5f}",
          accuracy_top1=f"{100 * accuracy_top1_tracker.average:05.2f}",
          accuracy_top5=f"{100 * accuracy_top5_tracker.average:05.2f}",
      )

      # Increment num_iteration on all iterations except the last,
      # so that the evaluation is logged to the correct iteration
      if i < len(data_loader) - 1:
          num_iteration += 1

  # Add the normalized confusion matrix to tensorboard and flush it
  tensorboard_writer.add_figure(
      "confusion_matrix", confusion_matrix_tracker.plot_confusion_matrix(normalize=True), num_iteration
  )
  tensorboard_writer.flush()

  return (
      num_iteration,
      loss_tracker.average,
      accuracy_top1_tracker.average,
      accuracy_top5_tracker.average,
      confidence_tracker.average,
  )


## Test Utils

In [None]:
def test(
    model: nn.Module,
    classes: dict,
    data_loader: torch.utils.data.DataLoader,
    criterion: nn.Module,
    epoch: int,
    num_iteration: int,
    use_cuda: bool,
    tensorboard_writer: torch.utils.tensorboard.SummaryWriter,
    name_step: str,
):
    """ Test a given model
    Args:
        model (nn.Module): model to test.
        classes (dict): dictionnary containing the classes and their indice.
        data_loader (torch.utils.data.DataLoader): data loader with the data to test the model on.
        criterion (nn.Module): loss function.
        epoch (int): epoch of training corresponding to the model.
        num_iteration (int): number of iterations since the beginning of the training corresponding to the model.
        use_cuda (bool): boolean to decide if cuda should be used.
        tensorboard_writer (torch.utils.tensorboard.SummaryWriter): writer to write the metrics in tensorboard.
        name_step (str): name of the step to write it in the description of the progress_bar
    Returns:
        loss (float): final loss
        accuracy_top1 (float): final accuracy top1
        accuracy_top5 (float): final accuracy top5
        confidence_mean (float): mean confidence
    """
    # Switch the model to eval mode
    model.eval()

    # Initialize the trackers for the loss and the accuracy
    loss_tracker = MetricTracker()
    accuracy_top1_tracker = MetricTracker()
    accuracy_top5_tracker = MetricTracker()
    confidence_tracker = MetricTracker()

    # Initialize confusing matrix
    confusion_matrix_tracker = ConfusionMatrix(classes)

    # create BackgroundGenerator and wrap it in tqdm progress bar
    progress_bar = tqdm(
        BackgroundGenerator(data_loader, max_prefetch=32), total=len(data_loader)
    )

    for data in progress_bar:
        inputs, targets = data

        if use_cuda:
            inputs = inputs.cuda()
            targets = targets.cuda()

        # forward pass
        # Forward pass
        if experiment_model == ExperimentModel.MLP:
          outputs = model(inputs.view(-1, 3*32*32))
        else:
          outputs = model(inputs)
         
        loss = criterion(outputs, targets)

        confidence, prediction = outputs.topk(dim=1, k=5)

        # Track loss, accuracy and confidence
        loss_tracker.update(loss.item())
        accuracy_top1_tracker.update(
            (prediction[:, 0] == targets).sum().item(), targets.numel()
        )
        accuracy_top5_tracker.update(
            (prediction[:, :5] == targets[:, None]).sum().item(), targets.numel()
        )
        confidence_tracker.update(confidence[:, 0].sum().item(), targets.numel())

        # Update the confusion matrix
        confusion_matrix_tracker.update_confusion_matrix(targets.cpu(), prediction[:, 0].cpu())

        # Update the progress_bar information
        progress_bar.set_description(f"Epoch {epoch + 1}/{num_epochs} {name_step}")
        progress_bar.set_postfix(
            loss=f"{loss_tracker.average:05.5f}",
            accuracy_top1=f"{100 * accuracy_top1_tracker.average:05.2f}",
            accuracy_top5=f"{100 * accuracy_top5_tracker.average:05.2f}",
        )

    # Add the new values to the tensorboard summary writer
    tensorboard_writer.add_scalar("loss", loss_tracker.average, num_iteration)
    tensorboard_writer.add_scalar(
        "accuracy_top1", accuracy_top1_tracker.average, num_iteration
    )
    tensorboard_writer.add_scalar(
        "accuracy_top5", accuracy_top5_tracker.average, num_iteration
    )
    # tensorboard_writer.add_scalar(
    #     "confidence_mean", confidence_tracker.average, num_iteration
    # )

    tensorboard_writer.add_figure(
        "confusion_matrix", confusion_matrix_tracker.plot_confusion_matrix(normalize=True), num_iteration
    )
    tensorboard_writer.flush()

    return (
        loss_tracker.average,
        accuracy_top1_tracker.average,
        accuracy_top5_tracker.average,
        confidence_tracker.average,
    )

In [None]:
def save_checkpoint(
    current_epoch: int,
    num_iteration: int,
    best_accuracy: float,
    model_state_dict: dict,
    optimizer_state_dict: dict,
    is_best: bool,
    experiment_path: str,
    checkpoint_filename: str = "checkpoint.pth.tar",
    best_filename: str = "model_best.pth.tar",
):
    """ Save the checkpoint and the best model to the disk
    Args:
        current_epoch (int): current epoch of the training.
        num_iteration (int): number of iterations since the beginning of the training.
        best_accuracy (float): last best accuracy obtained during the training.
        model_state_dict (dict): dictionary containing information about the model's state.
        optimizer_state_dict (dict): dictionary containing information about the optimizer's state.
        is_best (bool): boolean to save the current model as the new best model.
        experiment_path (str): path to the directory where to save the checkpoints and the best model.
        checkpoint_filename (str: "checkpoint.pth.tar"): filename to give to the checkpoint.
        best_filename (str: "model_best.pth.tar"):  filename to give to the best model's checkpoint.
    """
    print(
        f'Saving checkpoint{f" and new best model (best accuracy: {100 * best_accuracy:05.2f})" if is_best else f""}...'
    )
    checkpoint_filepath = os.path.join(experiment_path, checkpoint_filename)
    torch.save(
        {
            "epoch": current_epoch,
            "num_iteration": num_iteration,
            "best_accuracy": best_accuracy,
            "model_state_dict": model_state_dict,
            "optimizer_state_dict": optimizer_state_dict,
        },
        checkpoint_filepath,
    )
    if is_best:
        shutil.copyfile(
            checkpoint_filepath, os.path.join(experiment_path, best_filename),
        )


## Train - Test Pipeline ResNet

In [None]:
# --- MODEL TRAINING & TESTING ---
start_num_iteration = 0
start_epoch = 0
best_accuracy = 0.0
epochs_without_improvement = 0
purge_step = None
num_epochs = 30
patience = -1
 
# Restore the last checkpoint if available
checkpoint_filepath = os.path.join(experiment_path, "checkpoint.pth.tar")
if os.path.exists(checkpoint_filepath):
    print(f"Restoring last checkpoint from {checkpoint_filepath}...")
    checkpoint = torch.load(checkpoint_filepath)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    start_epoch = checkpoint["epoch"] + 1
    start_num_iteration = checkpoint["num_iteration"] + 1
    best_accuracy = checkpoint["best_accuracy"]
    purge_step = start_num_iteration
    print(
        f"Last checkpoint restored. Starting at epoch {start_epoch + 1} with best accuracy at {100 * best_accuracy:05.3f}."
    )

# Create the tensorboard summary writers for training and validation steps
train_writer = SummaryWriter(
    os.path.join(experiment_path, "train"), purge_step=purge_step
)
valid_writer = SummaryWriter(
    os.path.join(experiment_path, "valid"), purge_step=purge_step
)
test_writer = SummaryWriter(
    os.path.join(experiment_path, "test"), purge_step=purge_step
)

# main training loop
num_iteration = start_num_iteration
for epoch in range(start_epoch, num_epochs):

    # --- TRAIN ---

    num_iteration, _, _, _, _ = train(
        model=model,
        classes=classes,
        data_loader=train_loader,
        criterion=criterion,
        optimizer=optimizer,
        epoch=epoch,
        num_iteration=num_iteration,
        use_cuda=use_cuda,
        tensorboard_writer=train_writer
    )

    # --- VALID ---

    is_best = False

    _, valid_accuracy_top1, _, _ = test(
        model=model,
        classes=classes,
        data_loader=val_loader,
        criterion=criterion,
        epoch=epoch,
        num_iteration=num_iteration,
        use_cuda=use_cuda,
        tensorboard_writer=valid_writer,
        name_step="Valid",
    )

    # Save the best model
    if valid_accuracy_top1 > best_accuracy:
        is_best = True
        best_accuracy = valid_accuracy_top1
        # Re-initialize epochs_without_improvement
        epochs_without_improvement = 0

    # Early stopping
    elif (patience >= 0) and (epochs_without_improvement >= patience):
        print(
            f"No improvement for the last {epochs_without_improvement} epochs, stopping the training (best accuracy: {100 * best_accuracy:05.2f})."
        )
        break

    else:
        epochs_without_improvement += 1

    save_checkpoint(
        current_epoch=epoch,
        num_iteration=num_iteration,
        best_accuracy=best_accuracy,
        model_state_dict=model.state_dict(),
        optimizer_state_dict=optimizer.state_dict(),
        is_best=is_best,
        experiment_path=experiment_path,
    )

    # increment num_iteration after evaluation for the next epoch of training
    num_iteration += 1

/home/one/Development/AdaptiveCostSensitiveClassification/FMNIST/resnet/experiment/220604-160336_vgg16

In [None]:
# --- TEST ---

# Restore the best model to test it
best_model_filepath = os.path.join(experiment_path, "model_best.pth.tar")
if os.path.exists(best_model_filepath):
    print(f"Loading best model from {best_model_filepath}...")
    checkpoint = torch.load(best_model_filepath)
    model.load_state_dict(checkpoint["model_state_dict"])
    best_accuracy = checkpoint["best_accuracy"]
    epoch = checkpoint["epoch"]
    num_iteration = checkpoint["num_iteration"]

_, test_accuracy_top1, _, _ = test(
    model=model,
    classes=classes,
    data_loader=test_loader,
    criterion=criterion,
    epoch=epoch,
    num_iteration=num_iteration,
    use_cuda=use_cuda,
    tensorboard_writer=test_writer,
    name_step="Test",
)

# Print final accuracy of the best model on the test set
print(
    f"Best ResNet18 model has an accuracy of {100 * test_accuracy_top1:05.2f} on the Fashion MNIST test set."
)

## TENSORBOARD

In [None]:
# Load the TensorBoard notebook extension
%reload_ext tensorboard

%tensorboard --logdir {"experiment/220604-111621_mlp"}  -- MLP - Orig - CIFAR10
%tensorboard --logdir {"experiment/220604-134904_resnet18"} -- ResNet - Orig - FMNIST
%tensorboard --logdir {"experiment/220604-160336_vgg16"} -- VGG16 - Orig - IntelImage


%tensorboard --logdir {"experiment/220604-194320_resnet18"} -- ResNet - 70 - FMNIST 
%tensorboard --logdir {"experiment/220604-203020_resnet18"} -- ResNet - 90 - FMNIST 

%tensorboard --logdir {"experiment/220604-204100_resnet18"} -- ResNet - Orig - CIFAR10 

 

In [None]:
 

%tensorboard --logdir {"experiment/220604-204100_resnet18"}
 

In [None]:
from tensorboard import notebook
notebook.list() # View open TensorBoard instances


Epoch 23/30 Test: 100%|██████████| 40/40 [00:01<00:00, 31.11it/s, accuracy_top1=89.27, accuracy_top5=99.72, loss=0.40885]
Best ResNet18 model has an accuracy of 89.27 on the Fashion MNIST test set.

# END