# **K-Fold Cross Validation**

In [1]:
# necessary imports
import os
import numpy
import pandas
import random
import trimesh
import logging
import itertools
import tensorflow
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import models, layers, regularizers, optimizers
from sklearn.model_selection import train_test_split, StratifiedKFold
from tensorflow.keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization
from tensorflow.keras.layers import Conv1D, MaxPooling1D, GlobalMaxPooling1D, Conv2D, MaxPooling2D

# increase matplotlib plots font size
plt.rcParams.update({'font.size': 15})

# dataset root path
DATASET_ROOT = '/run/media/rr/M2/DevOps/jupyter-lab/CIDL/dataset/'

# final preprocessed dataset directory path
DATASET_PATH = os.path.join(DATASET_ROOT, 'Preprocessed')

# directory where to save the best model for each fold
saved_models = 'saved_models'
images_models_save_dir = os.path.join(saved_models, 'images')
pointclouds_models_save_dir = os.path.join(saved_models, 'pointclouds')

# needed to create pointclouds dataset
class_labels_dict = {'table':0, 'chair':1, 'lamp':2, 'dresser':3, 'sofa':4}

In [2]:
# create saved_models directory if not present
if not os.path.isdir(saved_models):
    os.mkdir(saved_models)
    
if not os.path.isdir(images_models_save_dir):
    os.mkdir(images_models_save_dir)

if not os.path.isdir(pointclouds_models_save_dir):
    os.mkdir(pointclouds_models_save_dir)

In [3]:
# utility function to get model name
def get_model_name(model_name, fold_counter):
    return model_name + '-fold-' + str(fold_counter) + '.h5'

In [4]:
# plot loss and acuracy for each epoch
def plot_train_loss_accuracy(save_path, train_loss, train_accuracy, val_loss, val_accuracy):
    plt.figure(figsize=(10, 10))
    plt.plot(train_loss,'o-g', label="Training Set")
    plt.plot(val_loss,'o-r', label="Validation Set")
    plt.title('Training and Validation Sets Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend(loc="upper right")
    plt.savefig(save_path + '-train-val-loss.jpg', bbox_inches='tight', dpi=200)
    plt.show()
    plt.figure(figsize=(10, 10))
    plt.plot(train_accuracy,'o-g', label="Training Set")
    plt.plot(val_accuracy,'o-r', label="Validation Set")
    plt.title('Training and Validation Sets Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epochs')
    plt.legend(loc="lower right")
    plt.savefig(save_path + '-train-val-accuracy.jpg', bbox_inches='tight', dpi=200)
    plt.show()

In [5]:
# utility function used to plot confusion matrix
def plot_confusion_matrix(cm, classes, save_path, normalize=True, title='Confusion matrix', cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.figure(figsize=(10,10))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = numpy.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, numpy.newaxis]
        cm = numpy.around(cm, decimals=2)
        cm[numpy.isnan(cm)] = 0.0
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
    
    thresh = cm.max() / 2.

    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j], horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig(save_path + '-confusion-matrix.jpg', bbox_inches='tight', dpi=200)
    plt.show()

In [6]:
# utility function to augment pointclouds data
def augment(points, label):
    # jitter points
    points += tensorflow.random.uniform(points.shape, -0.005, 0.005, dtype=tensorflow.float64)
    # shuffle points
    points = tensorflow.random.shuffle(points)
    return points, label

In [7]:
# only log critical messages
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

In [8]:
# load images csv file
images_data = pandas.read_csv(os.path.join(DATASET_PATH, 'images.csv'))

# extract images path and class labels
images_X = images_data[['filename']]
images_Y = images_data[['class_label']]

# load pointclouds csv file
pointclouds_data = pandas.read_csv(os.path.join(DATASET_PATH, 'pointclouds.csv'))

# extract images path and class labels
pointclouds_X = pointclouds_data[['filename']]
pointclouds_Y = pointclouds_data[['class_label']]

## **Stratified K-Fold Cross Validation for images**

In [9]:
def images_kfold_validation(model_name, n_splits, test_size, shuffle, layers, learning_rate, decay, target_size, epochs, batch_size, one_fold=True, resample_data=0, augment=False):
    global images_data
    global images_X
    global images_Y
    if resample_data > 0:
        images_data = images_data.groupby('class_label', group_keys=False).apply(lambda x: x.sample(min(len(x), resample_data)))
        images_X = images_data[['filename']]
        images_Y = images_data[['class_label']]

    # fold counter
    fold_counter = 1

    # arrays to store test set loss and accuracy scores for each fold
    TEST_LOSS = []
    TEST_ACCURACY = []

    # split train and test dataset
    images_train, images_test = train_test_split(images_data, test_size=test_size, stratify=images_Y)
    images_train_X = images_train[['filename']]
    images_train_Y = images_train[['class_label']]

    # define stratified k fold cross validation parameters
    stratified_kfold = StratifiedKFold(n_splits=n_splits, shuffle=shuffle)

    # instanciate image generator with data augmentation if required
    if augment == True:
        image_data_generator = ImageDataGenerator(rotation_range=40, width_shift_range=0.01,
                                                  height_shift_range=0.01, horizontal_flip=True,
                                                  shear_range=20.0, fill_mode='nearest', rescale=1./255)
    else:
        image_data_generator = ImageDataGenerator(rescale=1./255)

    # test split
    test_data_generator  = image_data_generator.flow_from_dataframe(images_test, directory=None,
                                                        x_col="filename", y_col="class_label",
                                                        class_mode="categorical", shuffle=shuffle,
                                                        target_size=target_size, batch_size=batch_size)

    # generate training and validation folds
    for train_index, validation_index in stratified_kfold.split(images_train_X, images_train_Y):
        print("\n-------- STARTING FOLD: " + str(fold_counter) + " --------")

        # best model save path
        if not os.path.isdir(os.path.join(images_models_save_dir, model_name)):
            os.mkdir(os.path.join(images_models_save_dir, model_name))
        images_model_save_path = os.path.join(images_models_save_dir, model_name, get_model_name(model_name, fold_counter))

        # training and test folds indices
        training_data = images_train.iloc[train_index]
        validation_data = images_train.iloc[validation_index]
        train_data_generator = image_data_generator.flow_from_dataframe(training_data, directory=None,
                                                                        x_col="filename", y_col="class_label",
                                                                        class_mode="categorical", shuffle=shuffle,
                                                                        target_size=target_size, batch_size=batch_size)
        validation_data_generator = image_data_generator.flow_from_dataframe(validation_data, directory=None,
                                                                        x_col="filename", y_col="class_label",
                                                                        class_mode="categorical", shuffle=shuffle,
                                                                        target_size=target_size, batch_size=batch_size)

        # create best model checkpoint
        best_model_checkpoint = tensorflow.keras.callbacks.ModelCheckpoint(images_model_save_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)
        #val_accuracy_early_stopping = tensorflow.keras.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', min_delta=0.01, patience=30, verbose=1)
        callbacks_list = [best_model_checkpoint]

        # define model to be trained and tested
        model = models.Sequential(name=model_name + "-" + str(fold_counter))
        for layer in layers:
            model.add(layer)
        model.summary()
        model.compile(tensorflow.keras.optimizers.RMSprop(learning_rate=learning_rate, decay=decay), loss="categorical_crossentropy", metrics=["accuracy"])

        # train model
        history = model.fit(train_data_generator,
                            epochs=epochs,
                            callbacks=callbacks_list,
                            validation_data=validation_data_generator)

        # plot loss and acuracy for each training/validation fold
        plot_train_loss_accuracy(images_model_save_path, history.history["loss"], history.history["accuracy"],
                                 history.history["val_loss"], history.history["val_accuracy"])

        # LOAD BEST MODEL to evaluate the performance of the model
        model.load_weights(images_model_save_path)

        # evaluate the model on the test set
        test_loss, test_accuracy = model.evaluate(test_data_generator, batch_size=batch_size)
        print("Best model Test Loss: " + str(test_loss))
        print("Best model Test Accuracy: " + str(test_accuracy))
        
        # store test set loss and accuracy scores for each fold
        TEST_LOSS.append(test_loss)
        TEST_ACCURACY.append(test_accuracy)

        # --- Report ---
        target_names = []
        for key in test_data_generator.class_indices:
            target_names.append(key)

        # Confution Matrix 
        Y_pred = model.predict(test_data_generator)
        y_pred = numpy.argmax(Y_pred, axis=1)
        print('Confusion Matrix')
        cm = confusion_matrix(test_data_generator.classes, y_pred)
        plot_confusion_matrix(cm, target_names, images_model_save_path, title='Confusion Matrix')

        # print Classification Report
        print('Classification Report')
        print(classification_report(test_data_generator.classes, y_pred, target_names=target_names))

        # clean up before next fold
        del model
        tensorflow.keras.backend.clear_session()
        print("\n-------- TERMINATED FOLD: " + str(fold_counter) + " --------")

        # if one fold execution was requested, terminate here
        if one_fold == True:
            return;
        else:
            fold_counter += 1

#### **Developer harness test for Stratified K-Fold Cross Validation for images**

In [10]:
# testing
#layers = [
#    Conv2D(128, (3, 3), input_shape=(128, 128, 3)),
#    Activation('relu'),
#    MaxPooling2D(pool_size=(8, 8)),
#    Conv2D(256, (3, 3)),
#    Activation('relu'),
#    MaxPooling2D(pool_size=(8, 8)),
#    Flatten(),
#    Dense(64),
#    Activation('relu'),
#    Dense(5, activation='softmax')
#]

# train, validate and test
#images_kfold_validation("images-testing", 6, 0.20, True, layers, 0.001, 1e-6, (128,128), 50, 32, 500)

## **Stratified K-Fold Cross Validation for pointclouds**

In [11]:
def pointclouds_kfold_validation(model_name, n_splits, test_size, shuffle, layers, learning_rate, decay, target_size, epochs, batch_size, one_fold=True, resample_data=0, augment=False):
    global pointclouds_data
    global pointclouds_X
    global pointclouds_Y
    if resample_data > 0:
        pointclouds_data = pointclouds_data.groupby('class_label', group_keys=False).apply(lambda x: x.sample(min(len(x), resample_data)))
        pointclouds_X = pointclouds_data[['filename']]
        pointclouds_Y = pointclouds_data[['class_label']]

    # fold counter
    fold_counter = 1
    
    # arrays to store test set loss and accuracy scores for each fold
    TEST_LOSS = []
    TEST_ACCURACY = []
    
    # split train and test dataset
    pointclouds_train, pointclouds_test = train_test_split(pointclouds_data, test_size=test_size, stratify=pointclouds_Y)
    pointclouds_train_X = pointclouds_train[['filename']]
    pointclouds_train_Y = pointclouds_train[['class_label']]

    # define stratified k fold cross validation parameters
    stratified_kfold = StratifiedKFold(n_splits=n_splits, shuffle=shuffle)

    # test data arrays
    test_pointclouds = []
    test_labels = []
    test_string_labels = {}

    # test split ready
    for index, test_data_row in pointclouds_test.iterrows():
        test_pointclouds.append(trimesh.load(test_data_row['filename'], force='mesh').sample(target_size))
        test_labels.append(class_labels_dict[test_data_row['class_label']])
        test_string_labels[index] = test_data_row['class_label']
    
    # convert to numpy array
    test_pointclouds = numpy.array(test_pointclouds)
    test_labels = numpy.array(test_labels)

    # create test tf.data.Dataset
    test_dataset = tensorflow.data.Dataset.from_tensor_slices((test_pointclouds, test_labels))
    test_dataset = test_dataset.shuffle(len(test_pointclouds)).batch(batch_size)
    print("Found " + str(len(pointclouds_test)) + " validated pointcloud filenames belonging to " + str(len(pointclouds_test['class_label'].unique())) + " classes.")


    # generate train and validation folds
    for train_index, validation_index in stratified_kfold.split(pointclouds_train_X, pointclouds_train_Y):
        print("\n-------- STARTING FOLD: " + str(fold_counter) + " --------")

        # best model save path
        if not os.path.isdir(os.path.join(pointclouds_models_save_dir, model_name)):
            os.mkdir(os.path.join(pointclouds_models_save_dir, model_name))
        pointclouds_model_save_path = os.path.join(pointclouds_models_save_dir, model_name, get_model_name(model_name, fold_counter))

        # train and validation data arrays
        train_pointclouds = []
        train_labels = []
        train_string_labels = {}
        validation_pointclouds = []
        validation_labels = []
        validation_string_labels = {}

        # training and test folds indices
        training_data = pointclouds_train.iloc[train_index]
        validation_data = pointclouds_train.iloc[validation_index]

        for index, training_data_row in training_data.iterrows():
            train_pointclouds.append(trimesh.load(training_data_row['filename'], force='mesh').sample(target_size))
            train_labels.append(class_labels_dict[training_data_row['class_label']])
            train_string_labels[index] = training_data_row['class_label']

        # convert to numpy array
        train_pointclouds = numpy.array(train_pointclouds)
        train_labels = numpy.array(train_labels)

        # create train tf.data.Dataset
        train_dataset = tensorflow.data.Dataset.from_tensor_slices((train_pointclouds, train_labels))
        if augment == True:
            train_dataset = train_dataset.shuffle(len(train_pointclouds)).map(augment).batch(batch_size)
        else:
            train_dataset = train_dataset.shuffle(len(train_pointclouds)).batch(batch_size)
        print("Found " + str(len(training_data)) + " validated pointcloud filenames belonging to " + str(len(training_data['class_label'].unique())) + " classes.")

        for index, validation_data_row in validation_data.iterrows():
            validation_pointclouds.append(trimesh.load(validation_data_row['filename'], force='mesh').sample(target_size))
            validation_labels.append(class_labels_dict[validation_data_row['class_label']])
            validation_string_labels[index] = validation_data_row['class_label']

        # convert to numpy array
        validation_pointclouds = numpy.array(validation_pointclouds)
        validation_labels = numpy.array(validation_labels)

        # create train tf.data.Dataset
        validation_dataset = tensorflow.data.Dataset.from_tensor_slices((validation_pointclouds, validation_labels))
        if augment == True:
            validation_dataset = validation_dataset.shuffle(len(validation_pointclouds)).map(augment).batch(batch_size)
        else:
            validation_dataset = validation_dataset.shuffle(len(validation_pointclouds)).batch(batch_size)
        print("Found " + str(len(validation_data)) + " validated pointcloud filenames belonging to " + str(len(validation_data['class_label'].unique())) + " classes.")

        # create best model checkpoint
        best_model_checkpoint = tensorflow.keras.callbacks.ModelCheckpoint(pointclouds_model_save_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
        callbacks_list = [best_model_checkpoint]

        # define model to be trained and tested
        model = models.Sequential(name=model_name + "-" + str(fold_counter))
        for layer in layers:
            model.add(layer)
        model.summary()
        model.compile(optimizer=tensorflow.keras.optimizers.RMSprop(learning_rate=learning_rate, decay=decay), loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])

        # train model
        history = model.fit(train_dataset,
                            epochs=epochs,
                            callbacks=callbacks_list,
                            validation_data=validation_dataset)

        # plot loss and acuracy for each training/validation fold
        plot_train_loss_accuracy(pointclouds_model_save_path, history.history["loss"], history.history["sparse_categorical_accuracy"],
                                 history.history["val_loss"], history.history["val_sparse_categorical_accuracy"])

        # LOAD BEST MODEL to evaluate the performance of the model on the test set
        model.load_weights(pointclouds_model_save_path)

        # evaluate the model on the test set
        test_loss, test_accuracy = model.evaluate(test_dataset, batch_size=batch_size)
        print("Best model Test Loss: " + str(test_loss))
        print("Best model Test Accuracy: " + str(test_accuracy))
        
        # store test set loss and accuracy scores for each fold
        TEST_LOSS.append(test_loss)
        TEST_ACCURACY.append(test_accuracy)

        # Confution Matrix 
        Y_pred = model.predict(test_dataset)
        y_pred = numpy.argmax(Y_pred, axis=1)
        print('Confusion Matrix')
        cm = confusion_matrix(test_labels, y_pred)
        plot_confusion_matrix(cm, class_labels_dict.keys(), pointclouds_model_save_path, title='Confusion Matrix')

        # print Classification Report
        print('Classification Report')
        print(classification_report(test_labels, y_pred, target_names=class_labels_dict.keys()))

        # clean up before next fold
        del model
        tensorflow.keras.backend.clear_session()
        print("\n-------- TERMINATED FOLD: " + str(fold_counter) + " --------")

        # if one fold execution was requested, terminate here
        if one_fold==True:
            return;
        else:
            fold_counter += 1

#### **Developer harness test for Stratified K-Fold Cross Validation for pointclouds**

In [12]:
# testing
#layers = [
#    Conv1D(32, 3, padding='same', input_shape=(2048, 3)),
#    BatchNormalization(momentum=0.0),
#    Activation('relu'),
#    Conv1D(64, 3, padding='same', input_shape=(2048, 3)),
#    BatchNormalization(momentum=0.0),
#    Activation('relu'),
#    Conv1D(512, 3, padding='same', input_shape=(2048, 3)),
#    BatchNormalization(momentum=0.0),
#    Activation('relu'),
#    GlobalMaxPooling1D(),
#    Dense(512),
#    BatchNormalization(momentum=0.0),
#    Flatten(),
#    Dense(1024),
#    Activation('relu'),
#    Dense(5, activation='softmax')
#]

# train, validate and test
#pointclouds_kfold_validation("pointclouds-testing", 6, 0.20, True, layers, 0.001, 1e-6, 2048, 50, 32, 500)

#### **For demonstration purposes only:**

In [13]:
def pointclouds_kfold_validation_conv2d(model_name, n_splits, test_size, shuffle, layers, learning_rate, decay, target_size, epochs, batch_size, one_fold=True, resample_data=0, augment=False):
    global pointclouds_data
    global pointclouds_X
    global pointclouds_Y
    if resample_data > 0:
        pointclouds_data = pointclouds_data.groupby('class_label', group_keys=False).apply(lambda x: x.sample(min(len(x), resample_data)))
        pointclouds_X = pointclouds_data[['filename']]
        pointclouds_Y = pointclouds_data[['class_label']]

    # fold counter
    fold_counter = 1
    
    # instanciate image generator without data augmentation
    pointclouds_data_generator = ImageDataGenerator()
    
    # arrays to store test set loss and accuracy scores for each fold
    TEST_LOSS = []
    TEST_ACCURACY = []
    
    # split train and test dataset
    pointclouds_train, pointclouds_test = train_test_split(pointclouds_data, test_size=test_size, stratify=pointclouds_Y)
    pointclouds_train_X = pointclouds_train[['filename']]
    pointclouds_train_Y = pointclouds_train[['class_label']]

    # define stratified k fold cross validation parameters
    stratified_kfold = StratifiedKFold(n_splits=n_splits, shuffle=shuffle)

    # test data arrays
    test_pointclouds = []
    test_labels = []
    test_string_labels = {}

    # test split ready
    for index, test_data_row in pointclouds_test.iterrows():
        loaded_mesh = trimesh.load(test_data_row['filename'], force='mesh').sample(target_size)
        loaded_mesh = loaded_mesh.reshape(128, 128, 3)
        test_pointclouds.append(loaded_mesh)
        test_labels.append(class_labels_dict[test_data_row['class_label']])
        test_string_labels[index] = test_data_row['class_label']
    
    # convert to numpy array
    test_pointclouds = numpy.array(test_pointclouds)
    test_labels = numpy.array(test_labels)

    # create test tf.data.Dataset
    test_data_generator = pointclouds_data_generator.flow(test_pointclouds, test_labels, batch_size=batch_size)
    print("Found " + str(len(pointclouds_test)) + " validated pointcloud filenames belonging to " + str(len(pointclouds_test['class_label'].unique())) + " classes.")

    # generate train and validation folds
    for train_index, validation_index in stratified_kfold.split(pointclouds_train_X, pointclouds_train_Y):
        print("\n-------- STARTING FOLD: " + str(fold_counter) + " --------")

        # best model save path
        if not os.path.isdir(os.path.join(pointclouds_models_save_dir, model_name)):
            os.mkdir(os.path.join(pointclouds_models_save_dir, model_name))
        pointclouds_model_save_path = os.path.join(pointclouds_models_save_dir, model_name, get_model_name(model_name, fold_counter))

        # train and validation data arrays
        train_pointclouds = []
        train_labels = []
        train_string_labels = {}
        validation_pointclouds = []
        validation_labels = []
        validation_string_labels = {}

        # training and test folds indices
        training_data = pointclouds_train.iloc[train_index]
        validation_data = pointclouds_train.iloc[validation_index]

        for index, training_data_row in training_data.iterrows():
            loaded_mesh = trimesh.load(training_data_row['filename'], force='mesh').sample(target_size)
            loaded_mesh = loaded_mesh.reshape(128, 128, 3)
            train_pointclouds.append(loaded_mesh)
            train_labels.append(class_labels_dict[training_data_row['class_label']])
            train_string_labels[index] = training_data_row['class_label']

        # convert to numpy array
        train_pointclouds = numpy.array(train_pointclouds)
        train_labels = numpy.array(train_labels)

        # create train tf.data.Dataset
        train_data_generator = pointclouds_data_generator.flow(train_pointclouds, train_labels, batch_size=batch_size)
        print("Found " + str(len(training_data)) + " validated pointcloud filenames belonging to " + str(len(training_data['class_label'].unique())) + " classes.")

        for index, validation_data_row in validation_data.iterrows():
            loaded_mesh = trimesh.load(validation_data_row['filename'], force='mesh').sample(target_size)
            loaded_mesh = loaded_mesh.reshape(128, 128, 3)
            validation_pointclouds.append(loaded_mesh)
            validation_labels.append(class_labels_dict[validation_data_row['class_label']])
            validation_string_labels[index] = validation_data_row['class_label']

        # convert to numpy array
        validation_pointclouds = numpy.array(validation_pointclouds)
        validation_labels = numpy.array(validation_labels)

        # create train tf.data.Dataset
        validation_data_generator = pointclouds_data_generator.flow(validation_pointclouds, validation_labels, batch_size=batch_size)
        print("Found " + str(len(validation_data)) + " validated pointcloud filenames belonging to " + str(len(validation_data['class_label'].unique())) + " classes.")

        # create best model checkpoint
        best_model_checkpoint = tensorflow.keras.callbacks.ModelCheckpoint(pointclouds_model_save_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
        callbacks_list = [best_model_checkpoint]

        # define model to be trained and tested
        model = models.Sequential(name=model_name + "-" + str(fold_counter))
        for layer in layers:
            model.add(layer)
        model.summary()
        model.compile(optimizer=tensorflow.keras.optimizers.RMSprop(learning_rate=learning_rate, decay=decay), loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])

        # train model
        history = model.fit(train_data_generator,
                            epochs=epochs,
                            callbacks=callbacks_list,
                            validation_data=validation_data_generator)

        # plot loss and acuracy for each training/validation fold
        plot_train_loss_accuracy(pointclouds_model_save_path, history.history["loss"], history.history["sparse_categorical_accuracy"],
                                 history.history["val_loss"], history.history["val_sparse_categorical_accuracy"])

        # LOAD BEST MODEL to evaluate the performance of the model on the test set
        model.load_weights(pointclouds_model_save_path)

        # evaluate the model on the test set
        test_loss, test_accuracy = model.evaluate(test_data_generator, batch_size=batch_size)
        print("Best model Test Loss: " + str(test_loss))
        print("Best model Test Accuracy: " + str(test_accuracy))
        
        # store test set loss and accuracy scores for each fold
        TEST_LOSS.append(test_loss)
        TEST_ACCURACY.append(test_accuracy)

        # Confution Matrix 
        Y_pred = model.predict(test_data_generator)
        y_pred = numpy.argmax(Y_pred, axis=1)
        print('Confusion Matrix')
        cm = confusion_matrix(test_labels, y_pred)
        plot_confusion_matrix(cm, class_labels_dict.keys(), pointclouds_model_save_path, title='Confusion Matrix')

        # print Classification Report
        print('Classification Report')
        print(classification_report(test_labels, y_pred, target_names=class_labels_dict.keys()))

        # clean up before next fold
        del model
        tensorflow.keras.backend.clear_session()
        print("\n-------- TERMINATED FOLD: " + str(fold_counter) + " --------")

        # if one fold execution was requested, terminate here
        if one_fold==True:
            return;
        else:
            fold_counter += 1

In [14]:
def pointclouds_kfold_validation_pointnet(model_name, n_splits, test_size, shuffle, model, learning_rate, decay, target_size, epochs, batch_size, one_fold=True, resample_data=0, augment=False):
    global pointclouds_data
    global pointclouds_X
    global pointclouds_Y
    if resample_data > 0:
        pointclouds_data = pointclouds_data.groupby('class_label', group_keys=False).apply(lambda x: x.sample(min(len(x), resample_data)))
        pointclouds_X = pointclouds_data[['filename']]
        pointclouds_Y = pointclouds_data[['class_label']]

    # fold counter
    fold_counter = 1
    
    # arrays to store test set loss and accuracy scores for each fold
    TEST_LOSS = []
    TEST_ACCURACY = []
    
    # split train and test dataset
    pointclouds_train, pointclouds_test = train_test_split(pointclouds_data, test_size=test_size, stratify=pointclouds_Y)
    pointclouds_train_X = pointclouds_train[['filename']]
    pointclouds_train_Y = pointclouds_train[['class_label']]

    # define stratified k fold cross validation parameters
    stratified_kfold = StratifiedKFold(n_splits=n_splits, shuffle=shuffle)

    # test data arrays
    test_pointclouds = []
    test_labels = []
    test_string_labels = {}

    # test split ready
    for index, test_data_row in pointclouds_test.iterrows():
        test_pointclouds.append(trimesh.load(test_data_row['filename'], force='mesh').sample(target_size))
        test_labels.append(class_labels_dict[test_data_row['class_label']])
        test_string_labels[index] = test_data_row['class_label']
    
    # convert to numpy array
    test_pointclouds = numpy.array(test_pointclouds)
    test_labels = numpy.array(test_labels)

    # create test tf.data.Dataset
    test_dataset = tensorflow.data.Dataset.from_tensor_slices((test_pointclouds, test_labels))
    test_dataset = test_dataset.shuffle(len(test_pointclouds)).batch(batch_size)
    print("Found " + str(len(pointclouds_test)) + " validated pointcloud filenames belonging to " + str(len(pointclouds_test['class_label'].unique())) + " classes.")


    # generate train and validation folds
    for train_index, validation_index in stratified_kfold.split(pointclouds_train_X, pointclouds_train_Y):
        print("\n-------- STARTING FOLD: " + str(fold_counter) + " --------")

        # best model save path
        if not os.path.isdir(os.path.join(pointclouds_models_save_dir, model_name)):
            os.mkdir(os.path.join(pointclouds_models_save_dir, model_name))
        pointclouds_model_save_path = os.path.join(pointclouds_models_save_dir, model_name, get_model_name(model_name, fold_counter))

        # train and validation data arrays
        train_pointclouds = []
        train_labels = []
        train_string_labels = {}
        validation_pointclouds = []
        validation_labels = []
        validation_string_labels = {}

        # training and test folds indices
        training_data = pointclouds_train.iloc[train_index]
        validation_data = pointclouds_train.iloc[validation_index]

        for index, training_data_row in training_data.iterrows():
            train_pointclouds.append(trimesh.load(training_data_row['filename'], force='mesh').sample(target_size))
            train_labels.append(class_labels_dict[training_data_row['class_label']])
            train_string_labels[index] = training_data_row['class_label']

        # convert to numpy array
        train_pointclouds = numpy.array(train_pointclouds)
        train_labels = numpy.array(train_labels)

        # create train tf.data.Dataset
        train_dataset = tensorflow.data.Dataset.from_tensor_slices((train_pointclouds, train_labels))
        if augment == True:
            train_dataset = train_dataset.shuffle(len(train_pointclouds)).map(augment).batch(batch_size)
        else:
            train_dataset = train_dataset.shuffle(len(train_pointclouds)).batch(batch_size)
        print("Found " + str(len(training_data)) + " validated pointcloud filenames belonging to " + str(len(training_data['class_label'].unique())) + " classes.")

        for index, validation_data_row in validation_data.iterrows():
            validation_pointclouds.append(trimesh.load(validation_data_row['filename'], force='mesh').sample(target_size))
            validation_labels.append(class_labels_dict[validation_data_row['class_label']])
            validation_string_labels[index] = validation_data_row['class_label']

        # convert to numpy array
        validation_pointclouds = numpy.array(validation_pointclouds)
        validation_labels = numpy.array(validation_labels)

        # create train tf.data.Dataset
        validation_dataset = tensorflow.data.Dataset.from_tensor_slices((validation_pointclouds, validation_labels))
        if augment == True:
            validation_dataset = validation_dataset.shuffle(len(validation_pointclouds)).map(augment).batch(batch_size)
        else:
            validation_dataset = validation_dataset.shuffle(len(validation_pointclouds)).batch(batch_size)
        print("Found " + str(len(validation_data)) + " validated pointcloud filenames belonging to " + str(len(validation_data['class_label'].unique())) + " classes.")

        # create best model checkpoint
        best_model_checkpoint = tensorflow.keras.callbacks.ModelCheckpoint(pointclouds_model_save_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
        callbacks_list = [best_model_checkpoint]

        # define model to be trained and tested
        model.summary()
        model.compile(optimizer=tensorflow.keras.optimizers.Adam(learning_rate=learning_rate, decay=decay), loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])

        # train model
        history = model.fit(train_dataset,
                            epochs=epochs,
                            callbacks=callbacks_list,
                            validation_data=validation_dataset)

        # plot loss and acuracy for each training/validation fold
        plot_train_loss_accuracy(pointclouds_model_save_path, history.history["loss"], history.history["sparse_categorical_accuracy"],
                                 history.history["val_loss"], history.history["val_sparse_categorical_accuracy"])

        # LOAD BEST MODEL to evaluate the performance of the model on the test set
        model.load_weights(pointclouds_model_save_path)

        # evaluate the model on the test set
        test_loss, test_accuracy = model.evaluate(test_dataset, batch_size=batch_size)
        print("Best model Test Loss: " + str(test_loss))
        print("Best model Test Accuracy: " + str(test_accuracy))
        
        # store test set loss and accuracy scores for each fold
        TEST_LOSS.append(test_loss)
        TEST_ACCURACY.append(test_accuracy)

        # Confution Matrix 
        Y_pred = model.predict(test_dataset)
        y_pred = numpy.argmax(Y_pred, axis=1)
        print('Confusion Matrix')
        cm = confusion_matrix(test_labels, y_pred)
        plot_confusion_matrix(cm, class_labels_dict.keys(), pointclouds_model_save_path, title='Confusion Matrix')

        # print Classification Report
        print('Classification Report')
        print(classification_report(test_labels, y_pred, target_names=class_labels_dict.keys()))

        # clean up before next fold
        del model
        tensorflow.keras.backend.clear_session()
        print("\n-------- TERMINATED FOLD: " + str(fold_counter) + " --------")

        # if one fold execution was requested, terminate here
        if one_fold==True:
            return;
        else:
            fold_counter += 1