In [None]:
import time 
import pprint
import numpy as np
import matplotlib.pyplot as plt
from keras.datasets import cifar10
from cv2 import cvtColor, COLOR_RGB2GRAY
from matplotlib.animation import FuncAnimation
from scipy.interpolate import make_interp_spline

# preprocessing functions

In [None]:
def load_dataset():
    
    # load cifar10 dataset
    (train_x, train_y), (test_x, test_y) = cifar10.load_data()
    
    # store all numeric labels included in dataset into list
    class_labels = list(np.unique(train_y))
    
    # save class descriptions and merge them with numeric labels to dictonary
    class_names = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck'] 
    class_dict = dict(zip(class_labels, class_names))
    
    return [train_x, train_y], [test_x, test_y], class_dict


def process_data(data):
    
    # extract images and labels from data list
    images, labels = data[0], data[1]
    
    # convert images to grayscale via list comprehension and cv2 (32x32x3 --> 32x32x1)
    images_gray = np.asarray([cvtColor(img, COLOR_RGB2GRAY) for img in images])
    
    # scale image data (0-255 --> 0-1)
    images_scaled = images_gray/255.0
    
    # flatten image data (32x32x1 --> 1024)
    images_flat = images_scaled.reshape(len(images), -1)
    
    # flatten label data (N,1 --> N,) with N = len(labels)
    labels_flat = labels.reshape(-1)
    
    return [images_flat, labels_flat]


def split_train_data(data, proportion):
    
    # check if proportion is between 0 and 1
    assert 0.0 < proportion < 1.0, f'Invalid proportion: {proportion}. It must be between 0 and 1'
    
    # extract images and labels from data list
    images, labels = data[0], data[1]
    
    # divide the dataset into the given overlap proportion
    n = int(len(images)*proportion)
    overlap_x, overlap_y = images[:n], labels[:n]
    unique_x, unique_y = images[n:], labels[n:]
    
    return [unique_x, unique_y], [overlap_x, overlap_y]


def get_processed_data(proportion:float=0.2, debug:bool=True):
    
    # load dataset and get class dictonary
    train, test, class_dict = load_dataset()
    
    if debug:
        print('1. load CIFAR-10 dataset:\n')
        print(f'train:\t{train[0].shape}, {train[1].shape}')
        print(f'test:\t{test[0].shape}, {test[1].shape}\n')
        pprint.PrettyPrinter(depth=1).pprint(class_dict)
    
    # process train and test data
    train = process_data(train)
    test = process_data(test)
    
    if debug:
        print('\n2. preprocess dataset:\n')
        print(f'train:\t{train[0].shape}, {train[1].shape}')
        print(f'test:\t{test[0].shape}, {test[0].shape}')
        
    # split train data into "unique" and "overlapping" subsets
    unique, overlap = split_train_data(train, proportion)
    
    if debug:
        print('\n3. prepare train data for ensemble:\n')
        print(f'unique:\t{unique[0].shape}, {unique[1].shape}')
        print(f'overlap:{overlap[0].shape}, {overlap[1].shape}')
    
    return train, unique, overlap, test, class_dict

# ensemble class

In [None]:
class MyEnsembleClassifier:
    
    def __init__(self, base_classifier, n_classifier, class_dict):
        
        self.clf_n = n_classifier
        
        self.base_clf = base_classifier # fuction that returns a base classifier
        
        # initialize list of n base classifier via list comprehension 
        self.clf = [self.base_clf() for _ in range(self.clf_n)]
        print(f'initialized ensemble with {self.clf_n} {self.base_clf()}')
        
        # store information about data set
        self.class_dict = class_dict
        self.class_y = list(self.class_dict.keys())
        self.class_n = len(self.class_y)
        
        
    def create_subsets(self, unique, overlap, overlap_dict):
        
        self.subsets = []
        
        # extract images and labels from unique / overlapping dataset
        unique_x, unique_y = unique[0], unique[1]
        overlap_x, overlap_y = overlap[0], overlap[1]
        
        # Len Of Subset, needed for sequential sampling of unique dataset
        los = int(len(unique_x)/self.clf_n) 
        
        # create a subset for each classifier 
        for n in range(self.clf_n):
            
            # lower and upper boundaries for slicing the dataset
            l = n*los; u = (n+1)*los
            self.subsets.append([unique_x[l:u], unique_y[l:u]])
            
            # extract Random Samples from overlapping dataset (low, high, number of samples)
            rs = np.random.randint(0, len(overlap_x), overlap_dict[self.clf_n]) 
            
            # append overlapping data to subset list
            self.subsets[n][0] = np.append(self.subsets[n][0], overlap_x[rs]).reshape(-1, unique_x[0].shape[0])
            self.subsets[n][1] = np.append(self.subsets[n][1], overlap_y[rs])
  
        print(f'created {self.clf_n} subsets, each containing the following:\n')
        print(f'images:\t{self.subsets[0][0].shape}')
        print(f'labels:\t{self.subsets[0][1].shape}')
    
    
    def train(self, proportion=1):
        
        print(f'train each classifier on their corresponding subset:\n')
        
        start = time.time()
        
        # train each classifier on their corresponding subset
        for n, clf in enumerate(self.clf):
            
            print(f'{n+1}. classifier')
            
            # proportion below 1 reduce the size of the dataset, 
            # primarily used during development to reduce training time
            if proportion < 1:
                train_samples = int(len(self.subsets[n][0])*proportion) 
                train_data_x = self.subsets[n][0][:train_samples]
                train_data_y = self.subsets[n][1][:train_samples]
                
            else:
                train_data_x = self.subsets[n][0]
                train_data_y = self.subsets[n][1]
                
            clf.fit(train_data_x, train_data_y)
        
        self.train_duration = round(time.time() - start, 2)
        print(f'\ntraining duration: {self.train_duration} s')
        
        
    def get_accuracy_score(self, images, labels):
            
        # get accuracy score for each member
        clf_scores = [clf.score(images, labels) for clf in self.clf]

        # mean all classifier scores to an ensemble score
        return np.asarray(clf_scores).mean()
    
    
    def get_predictions(self, images, labels):

        # iterate over all classifier to get their predictions for images (via nested list comprehensions)
        predictions = [clf.predict(images) for clf in self.clf]

        # convert and reshape predictions (N_images, N_classifier)
        return np.asarray(predictions).T
        
        
    def get_relative_predictions(self, predictions):
        
        # iterate over all predictions to get relative number for all labels (via nested list comprehensions)
        relative_predictions = [[np.count_nonzero(pred == y)/self.clf_n for y in self.class_y] for pred in predictions]
        
        # convert and reshape predictions (N_predictions = N_images, N_classifier)
        return np.asarray(relative_predictions).reshape(-1, self.class_n)
        
        
    def get_overall_predictions(self, relative_predictions):
        
        # extract the index, respectively label, of highest value <-- ensemble prediction for image_i
        ensemble_predictions = np.asarray([np.argmax(pred) for pred in relative_predictions])
        
        # get prediction confidence of the ensemble for image_i
        ensemble_prediction_confidences =  np.asarray([np.max(pred) for pred in relative_predictions])
        
        return ensemble_predictions, ensemble_prediction_confidences
        
    
    def evaluate(self, images, labels):
        
        # call methods to get needed information for ensemble evaluation
        acc = self.get_accuracy_score(images, labels)
        pred = self.get_predictions(images, labels)
        relative_pred = self.get_relative_predictions(pred)
        ensemble_pred, ensemble_pred_confi = self.get_overall_predictions(relative_pred)
        
        return acc, pred, relative_pred, ensemble_pred, ensemble_pred_confi

# plotting functions

In [None]:
plt.style.use('default')
plt.rcParams['axes.linewidth'] = 2
plt.rcParams['axes.edgecolor'] = '#808080'

def interpolate_data(x, y):
    
    # fit a cubic spline to data
    x_new = np.linspace(x.min(), x.max(), 300)
    spline = make_interp_spline(x, y, k=3)
    y_new = spline(x_new)
    
    return [x_new, y_new]


def plot_MRF(data, title):
    
    data = np.asarray(data)
    np.savetxt(f'./results/raw_data/{title}.csv', data, delimiter=',')
    
    # preprocess data for plotting
    M, R, F, x = data[:,0], data[:,1], data[:,2], data[:,3]
    R, F = R*M*100, F*M*100
    M = M*100
    
    M_x, M_y = interpolate_data(x, M)
    R_x, R_y = interpolate_data(x, R)
    F_x, F_y = interpolate_data(x, F)
    
    plt.figure(figsize=(12,6))
    plt.plot(x, M, 'bo--', linewidth=2, label='$M$') 
    plt.plot(x, R, 'go--', linewidth=2, label='$R$')
    plt.plot(x, F, 'ro--', linewidth=2, label='$F$')
    plt.plot(M_x, M_y, 'b', linewidth=5, alpha=0.5)
    plt.plot(R_x, R_y, 'g', linewidth=5, alpha=0.5)
    plt.plot(F_x, F_y, 'r', linewidth=5, alpha=0.5)
    
    plt.xlim((-0.05, 1.05))
    plt.ylim((-5, 105))
    plt.xticks(np.arange(0, 1.1, 0.1))
    plt.yticks(np.arange(0, 110, 25))

    plt.title(f'{title}', fontsize=20)
    plt.xlabel('threshold', fontsize=16)
    plt.ylabel('classified images [%]', fontsize=16)
    plt.legend(fontsize=16)
    
    plt.grid()
    plt.savefig(f'./results/{title}.png', dpi=300)
    plt.show()

In [None]:
def draw_evaluation_process(predictions, certainties, label, overall_pred, class_dict, treshold, ax1, ax2):
    
    n_clf = len(predictions)

    # plot classifiers predictions for image_i
    color = 'green' if overall_pred == label else 'red'
    colors = [color if pred == overall_pred else 'lightgrey' for pred in predictions]
    ax1.bar(np.arange(1, n_clf+1), predictions+1, color=colors)
    ax1.axhline(label+1, color='black', linestyle='--', alpha=0.3, label=f'true label: {class_dict[label]}')
    ax1.set_xlabel('classifier', fontsize=14)
    ax1.set_ylabel('labels', fontsize=16)
    x_ticks = np.arange(0, n_clf+5, 5)
    x_ticks[0] = 1
    ax1.set_xticks(x_ticks)
    ax1.set_yticks(np.arange(len(class_dict)+2))
    y_labels = list(class_dict.keys())
    y_labels.insert(0, ''); y_labels.append('')
    ax1.set_yticklabels(y_labels)
    ax1.legend(loc='lower right', bbox_to_anchor=(1, 1), fontsize=10)

    # plot prediction certainties    
    max_idx = np.argmax(certainties)
    color = 'blue' if np.max(certainties) < treshold else 'black'
    colors = [color if i == max_idx else 'lightgrey' for i in range(n_clf)]
    ax2.bar(np.arange(10), certainties, color=colors)
    ax2.axhline(treshold, color='black', linestyle='--', alpha=0.3, label='treshold')
    ax2.set_xlabel('labels', fontsize=14)
    ax2.set_ylabel('ensemble confidence [%]', fontsize=16)
    ax2.set_xticks(np.arange(10))
    ax2.set_yticks(np.arange(0, 1.05, 0.1))
    ax2.set_ylim((0, 1.05))
    ax2.set_xticklabels(list(class_dict.values()))
    ax2.legend(loc='lower right', bbox_to_anchor=(1, 1), fontsize=10)
    plt.draw()
    

def animate_evaluation(n_pred, pred, relative_pred, labels, ensemble_pred, class_dict, treshold_range, title):
        
    # create a figure and axes for the animation
    fig, (ax1, ax2) = plt.subplots(2, figsize=(15,9))

    # define the function that will be called for each frame of the animation
    def animate(frame):
        
        n, p = frame
        
        ax1.clear(); ax2.clear()
        ax1.clear(); ax2.clear()
        ax1.set_title(f'Prediction {n+1}', fontsize=16, pad=-5)
        
        # call function to draw actual trashhold for image_i on figure
        draw_evaluation_process(pred[n], 
                                relative_pred[n], 
                                labels[n], 
                                ensemble_pred[n],
                                class_dict,
                                p, ax1, ax2)

    # create a list of tuples for the frames
    frames = [(n, p) for n in range(n_pred) for p in treshold_range]

    # create the animation
    ani = FuncAnimation(fig, animate, frames=frames, repeat=True)

    # save the animation as a GIF using Pillow
    ani.save(f'./results/{title}.gif', writer="pillow")