# Classifier ablation studies

## Libraries and packages


In [None]:
!pip3 install 'torch==1.4.0'
!pip3 install 'torchvision==0.5.0'
!pip3 install 'Pillow-SIMD'

In [None]:
import os
import urllib
import logging

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import Dataset, Subset, DataLoader, ConcatDataset
from torch.backends import cudnn

import torchvision
from torchvision import transforms

from PIL import Image
from copy import deepcopy

import numpy as np

from sklearn.metrics import confusion_matrix

In [None]:
# Download packages from repository
!git clone https://github.com/manuelemacchia/incremental-learning-image-classification.git
!mv -v incremental-learning-image-classification/* .
!rm -rf incremental-learning-image-classification README.md

from data.cifar100 import Cifar100
from model.resnet_cifar import resnet32
from model.manager import Manager
from model.lwf import LWF
from model.icarl import Exemplars
from model.icarl import iCaRL
from utils import plot

## Arguments

In [None]:
# Directories
DATA_DIR = 'data'       # Directory where the dataset will be downloaded

# Settings
DEVICE = 'cuda'

# Dataset
RANDOM_STATES = [658, 423, 422]      # For reproducibility of results                        
                                     # Note: different random states give very different
                                     # splits and therefore very different results.

NUM_CLASSES = 100       # Total number of classes

VAL_SIZE = 0.1          # Proportion of validation set with respect to training set (between 0 and 1)

# Training
BATCH_SIZE = 64         # Batch size (iCaRL sets this to 128)
LR = 2                  # Initial learning rate
                       
MOMENTUM = 0.9          # Momentum for stochastic gradient descent (SGD)
WEIGHT_DECAY = 1e-5     # Weight decay from iCaRL

NUM_RUNS = 3            # Number of runs of every method
                        # Note: this should be at least 3 to have a fair benchmark

NUM_EPOCHS = 70         # Total number of training epochs
MILESTONES = [49, 63]   # Step down policy from iCaRL (MultiStepLR)
                        # Decrease the learning rate by gamma at each milestone
GAMMA = 0.2             # Gamma factor from iCaRL

## Data preparation

In [None]:
# Transformations for Learning Without Forgetting
train_transform = transforms.Compose([transforms.RandomCrop(32, padding=4),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(), # Turn PIL Image to torch.Tensor
                                      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

test_transform = transforms.Compose([transforms.ToTensor(),
                                     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))                                    
])

In [None]:
train_subsets = [[] for i in range(NUM_RUNS)]
val_subsets = [[] for i in range(NUM_RUNS)]
test_subsets = [[] for i in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    for split_i in range(CLASS_BATCH_SIZE):
        if run_i+split_i == 0: # Download dataset only at first instantiation
            download = True
        else:
            download = False

        # Create CIFAR100 dataset
        train_dataset = Cifar100(DATA_DIR, train=True, download=download, random_state=RANDOM_STATES[run_i], transform=train_transform)
        test_dataset = Cifar100(DATA_DIR, train=False, download=False, random_state=RANDOM_STATES[run_i], transform=test_transform)
    
        # Subspace of CIFAR100 of 10 classes
        train_dataset.set_classes_batch(train_dataset.batch_splits[split_i]) 
        test_dataset.set_classes_batch([test_dataset.batch_splits[i] for i in range(0, split_i+1)])

        # Define train and validation indices
        train_indices, val_indices = train_dataset.train_val_split(VAL_SIZE, RANDOM_STATES[run_i])

        # Define subsets
        train_subsets[run_i].append(Subset(train_dataset, train_indices))
        val_subsets[run_i].append(Subset(train_dataset, val_indices))
        test_subsets[run_i].append(test_dataset)

## K-nearest neighbors

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection  import ParameterGrid
from copy import deepcopy

class iCaRLwithKNN(iCaRL):
    def classifier_fit(self, train_dataset, val_dataset, params, only_exemplars=False):
        """Fit classifier on the union of training dataset and exemplars,
        and validate it on val_dataset."""

        if only_exemplars:
            fit_dataset = Exemplars(self.exemplars, self.train_transform)
        else:
            # Union of training dataset and exemplars
            exemplars_dataset = Exemplars(self.exemplars, self.train_transform)
            fit_dataset = ConcatDataset([exemplars_dataset, train_dataset])

        # Convert dataset to numpy format
        # X contains training samples, y contains labels
        X, y = self.dataset_to_numpy(fit_dataset)

        # Extract features from the training dataset
        X_features = self.extract_features(torch.tensor(X, dtype=torch.float))
        for i in range(X_features.size(0)):
            X_features[i] = X_features[i]/X_features[i].norm()
        X_features = X_features.to('cpu').numpy()

        # Scale training features to range [0, 1] individually
        self.scaler = MinMaxScaler()
        self.scaler.fit(X_features)
        X_features = self.scaler.transform(X_features)

        # Initialize classifier
        self.clf = KNeighborsClassifier()

        # Run validation
        best_clf = None
        best_grid = None
        best_score = 0

        X_test, y_test = self.dataset_to_numpy(val_dataset)
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()

        X_test_features = self.scaler.transform(X_test_features)

        for grid in ParameterGrid(params):
            self.clf.set_params(**grid)
            self.clf.fit(X_features, y)
            y_pred = self.clf.predict(X_test_features)
            score = accuracy_score(y_test, y_pred)

            if score > best_score:
                best_clf = deepcopy(self.clf)
                best_score = score
                best_grid = grid

        # Set the classifier to the best clf found in validation
        self.clf = best_clf

        print(f"Best classifier: {best_grid} with score {best_score}")

        return best_grid

    def classifier_predict(self, test_dataset):
        """Predict labels of test_dataset."""

        X_test, y_test = self.dataset_to_numpy(test_dataset)

        # Extract features from the test set
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()

        X_test_features = self.scaler.transform(X_test_features)
        
        y_pred = self.clf.predict(X_test_features)

        return y_test, y_pred

    def dataset_to_numpy(self, dataset):
        # Preallocate arrays
        X = np.zeros((len(dataset), 3, 32, 32))
        y = np.zeros(len(dataset), dtype=int)

        dataloader = DataLoader(dataset, batch_size=1)

        for idx, (image, labels) in enumerate(dataloader):
            X[idx] = image[0].numpy()
            y[idx] = labels.numpy()[0]

        return X, y

    def test_knn(self, test_dataset, train_dataset, val_dataset, params, only_exemplars=False):
        """Test the model.

        Args:
            test_dataset: dataset on which to test the network
            train_dataset: training set used to train the last split
            params: parameter grid on which to perform hyperparameter tuning
        Returns:
            accuracy (float): accuracy of the model on the test set
        """

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)  # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        with torch.no_grad():
            # Use val_dataset as validation set for hyperparameter tuning.
            best_grid = self.classifier_fit(train_dataset, val_dataset, params, only_exemplars)
            y_truth, y_pred = self.classifier_predict(test_dataset)
            accuracy = accuracy_score(y_truth, y_pred)

            if only_exemplars:
                print(f"Test accuracy (iCaRL with KNN only exemplars): {accuracy} ")
            else:
                print(f"Test accuracy (iCaRL with KNN all available data): {accuracy} ")

        return accuracy, best_grid, torch.tensor(y_truth), torch.tensor(y_pred)

In [None]:
logs = [[] for _ in range(NUM_RUNS)]

params = {
    'n_neighbors': [3, 5, 7, 9],
    'weights': ['uniform', 'distance']
}

for run_i in range(NUM_RUNS):
    net = resnet32()
    icarl_knn = iCaRLwithKNN(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)

    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        icarl_knn.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        logs[run_i].append({})
        
        # Test KNN classifier with only exemplars
        acc, best_clf, targets, preds = \
            icarl_knn.test_knn(test_subsets[run_i][split_i], train_subsets[run_i][split_i], val_subsets[run_i][split_i], params, only_exemplars=True)
        logs[run_i][split_i]['exemplars_accuracy'] = acc
        logs[run_i][split_i]['exemplars_best_clf'] = best_clf
        logs[run_i][split_i]['exemplars_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))

        # Test KNN classifier with all available data
        acc, best_clf, targets, preds = \
            icarl_knn.test_knn(test_subsets[run_i][split_i], train_subsets[run_i][split_i], val_subsets[run_i][split_i], params, only_exemplars=False)
        logs[run_i][split_i]['all_accuracy'] = acc
        logs[run_i][split_i]['all_best_clf'] = best_clf
        logs[run_i][split_i]['all_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))

## Cosine linear layer

In [None]:
from model.resnet_cifar import resnet32cosine
from model.resnet_cifar import CosineLayer

cos = nn.CosineSimilarity(dim=1, eps=1e-08)

class iCaRLwithCosine(iCaRL):
    def classify(self, batch, train_dataset=None):
        """Mean of exemplars with cosine similarity classifier"""

        batch_features = self.extract_features(batch)
        for i in range(batch_features.size(0)):
            batch_features[i] = batch_features[i]/batch_features[i].norm() # Normalize sample feature representation
        batch_features = batch_features.to(self.device) # (batch size, 64)

        if self.cached_means is None:
            print("Computing mean of exemplars... ", end="")

            self.cached_means = []

            # Number of known classes
            num_classes = len(self.exemplars)

            # Compute the means of classes with all the data available,
            # including training data which contains samples belonging to
            # the latest 10 classes. This will remove noise from the mean
            # estimate, improving the results.
            if train_dataset is not None:
                train_features_list = [[] for _ in range(10)]

                for train_sample, label in train_dataset:
                    features = self.extract_features(train_sample, batch=False, transform=self.test_transform)
                    features = features/features.norm()
                    train_features_list[label % 10].append(features)

            # Compute means of exemplars for all known classes
            for y in range(num_classes):
                if (train_dataset is not None) and (y in range(num_classes-10, num_classes)):
                    features_list = train_features_list[y % 10]
                else:
                    features_list = []

                for exemplar in self.exemplars[y]:
                    features = self.extract_features(exemplar, batch=False, transform=self.test_transform)
                    features = features/features.norm() # Normalize the feature representation of the exemplar
                    features_list.append(features)
                
                features_list = torch.stack(features_list)
                class_means = features_list.mean(dim=0)
                class_means = class_means/class_means.norm() # Normalize the class means

                self.cached_means.append(class_means)
            
            self.cached_means = torch.stack(self.cached_means).to(self.device)
            print("done")

        batch_features = batch_features.unsqueeze(0) # (1, batch_size, 64)
        batch_features = batch_features.expand((self.cached_means.size(0), -1, -1)) # (num_classes, batch_size, 64)
        batch_features = batch_features.transpose(0, 1) # (batch_size, num_classes, 64) to compare to means: (num_classes, 64)

        preds = []
        for i in range(batch_features.size(0)):
            f_arg = cos(batch_features[i], self.cached_means)
            preds.append(torch.argmax(f_arg))
        
        return torch.stack(preds)

    def extract_features(self, sample, batch=True, transform=None):
        assert not (batch is False and transform is None), "if a PIL image is passed to extract_features, a transform must be defined"

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)
        if self.old_net is not None: self.old_net.train(False)

        if batch is False: # Treat sample as single PIL image
            sample = transform(sample)
            sample = sample.unsqueeze(0) # https://stackoverflow.com/a/59566009/6486336

        sample = sample.to(self.device)

        if self.VALIDATE:
            features = self.best_net(sample, features=True)
        else:
            features = self.net(sample, features=True)

        if batch is False:
            features = features[0]

        return features

    def test_fc(self, test_dataset):
        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False) # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        self.test_dataloader = DataLoader(test_dataset, batch_size=self.BATCH_SIZE, shuffle=True, num_workers=4)

        running_corrects = 0
        total = 0

        all_preds = torch.tensor([]) # to store all predictions
        all_preds = all_preds.type(torch.LongTensor)
        all_targets = torch.tensor([])
        all_targets = all_targets.type(torch.LongTensor)
        
        for images, labels in self.test_dataloader:
            images = images.to(self.device)
            labels = labels.to(self.device)
            total += labels.size(0)

            # Forward Pass
            with torch.no_grad():
                if self.VALIDATE:
                    outputs = self.best_net(images)
                else:
                    outputs = self.net(images)

            # Get predictions
            _, preds = torch.max(outputs.data, 1)

            # Update Corrects
            running_corrects += torch.sum(preds == labels.data).data.item()

            all_targets = torch.cat(
                (all_targets.to(self.device), labels.to(self.device)), dim=0
            )

            # Append batch predictions
            all_preds = torch.cat(
                (all_preds.to(self.device), preds.to(self.device)), dim=0
            )

        # Calculate accuracy
        accuracy = running_corrects / float(total)  

        print(f"Test accuracy (Cosine): {accuracy}")

        return accuracy, all_targets, all_preds

    def increment_classes(self, n=10):
        """Add n classes in the final cosine layer."""

        in_features = self.net.fc.in_features  # size of each input sample
        out_features = self.net.fc.out_features  # size of each output sample
        weight = self.net.fc.weight.data
        eta = self.net.fc.eta.data

        self.net.fc = CosineLayer(in_features, out_features+n)
        self.net.fc.weight.data[:out_features] = weight
        self.net.fc.eta.data = eta

In [None]:
LR = 2

In [None]:
logs = [[] for _ in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    net = resnet32cosine()
    icarl_cosine = iCaRLwithCosine(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)

    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        icarl_cosine.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        logs[run_i].append({})
        
        # Test Cosine layer classifier (only FC)
        acc, targets, preds = icarl_cosine.test_fc(test_subsets[run_i][split_i])
        logs[run_i][split_i]['cosine_fc_accuracy'] = acc
        logs[run_i][split_i]['cosine_fc_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))

        # Test Cosine similarity
        acc, targets, preds = icarl_cosine.test(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs[run_i][split_i]['cosine_sim_accuracy'] = acc
        logs[run_i][split_i]['cosine_sim_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))

## Random forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection  import ParameterGrid
from copy import deepcopy

class iCaRLwithRF(iCaRL):
    def classifier_fit(self, train_dataset, val_dataset, params, only_exemplars=False):
        """Fit classifier on the union of training dataset and exemplars,
        and validate it on val_dataset."""

        if only_exemplars:
            fit_dataset = Exemplars(self.exemplars, self.train_transform)
        else:
            # Union of training dataset and exemplars
            exemplars_dataset = Exemplars(self.exemplars, self.train_transform)
            fit_dataset = ConcatDataset([exemplars_dataset, train_dataset])

        # Convert dataset to numpy format
        # X contains training samples, y contains labels
        X, y = self.dataset_to_numpy(fit_dataset)

        # Extract features from the training dataset
        X_features = self.extract_features(torch.tensor(X, dtype=torch.float))
        for i in range(X_features.size(0)):
            X_features[i] = X_features[i]/X_features[i].norm()
        X_features = X_features.to('cpu').numpy()

        # Initialize classifier
        self.clf = RandomForestClassifier()

        # Run validation
        best_clf = None
        best_grid = None
        best_score = 0

        X_test, y_test = self.dataset_to_numpy(val_dataset)
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()

        for grid in ParameterGrid(params):
            self.clf.set_params(**grid)
            self.clf.fit(X_features, y)
            y_pred = self.clf.predict(X_test_features)
            score = accuracy_score(y_test, y_pred)

            if score > best_score:
                best_clf = deepcopy(self.clf)
                best_score = score
                best_grid = grid

        # Set the classifier to the best clf found in validation
        self.clf = best_clf

        print(f"Best classifier: {best_grid} with score {best_score}")

        return best_grid

    def classifier_predict(self, test_dataset):
        """Predict labels of test_dataset."""

        X_test, y_test = self.dataset_to_numpy(test_dataset)

        # Extract features from the test set
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()
        
        y_pred = self.clf.predict(X_test_features)

        return y_test, y_pred

    def dataset_to_numpy(self, dataset):
        # Preallocate arrays
        X = np.zeros((len(dataset), 3, 32, 32))
        y = np.zeros(len(dataset), dtype=int)

        dataloader = DataLoader(dataset, batch_size=1)

        for idx, (image, labels) in enumerate(dataloader):
            X[idx] = image[0].numpy()
            y[idx] = labels.numpy()[0]

        return X, y

    def test_rf(self, test_dataset, train_dataset, val_dataset, params, only_exemplars=False):
        """Test the model.

        Args:
            test_dataset: dataset on which to test the network
            train_dataset: training set used to train the last split
            params: parameter grid on which to perform hyperparameter tuning
        Returns:
            accuracy (float): accuracy of the model on the test set
        """

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)  # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        with torch.no_grad():
            # Use val_dataset as validation set for hyperparameter tuning.
            best_grid = self.classifier_fit(train_dataset, val_dataset, params, only_exemplars)

            y_truth, y_pred = self.classifier_predict(test_dataset)
            accuracy = accuracy_score(y_truth, y_pred)

            if only_exemplars:
                print(f"Test accuracy (iCaRL with RF only exemplars): {accuracy} ")
            else:
                print(f"Test accuracy (iCaRL with RF all available data): {accuracy} ")

        return accuracy, best_grid, torch.tensor(y_truth), torch.tensor(y_pred)

In [None]:
logs = [[] for _ in range(NUM_RUNS)]

params = {
    "n_estimators": [100, 200, 500, 1000],
    "min_samples_split": [10, 20, 50]
}

for run_i in range(NUM_RUNS):
    net = resnet32()
    icarl_rf = iCaRLwithRF(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)

    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        icarl_rf.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        logs[run_i].append({})
        
        # Test RF classifier with only exemplars
        acc, best_clf, targets, preds = \
            icarl_rf.test_rf(test_subsets[run_i][split_i], train_subsets[run_i][split_i], val_subsets[run_i][split_i], params, only_exemplars=True)
        logs[run_i][split_i]['exemplars_accuracy'] = acc
        logs[run_i][split_i]['exemplars_best_clf'] = best_clf
        logs[run_i][split_i]['exemplars_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))

        # Test RF classifier with all available data
        acc, best_clf, targets, preds = \
            icarl_rf.test_rf(test_subsets[run_i][split_i], train_subsets[run_i][split_i], val_subsets[run_i][split_i], params, only_exemplars=False)
        logs[run_i][split_i]['all_accuracy'] = acc
        logs[run_i][split_i]['all_best_clf'] = best_clf
        logs[run_i][split_i]['all_conf_mat'] = confusion_matrix(targets.to('cpu'), preds.to('cpu'))