In [1]:
from matplotlib import pyplot as plt, image as mpimg
from matplotlib.gridspec import GridSpecFromSubplotSpec 
import numpy as np
import tensorflow as tf
import pandas as pd
import logging
import os
import time
from experiments import experiments, AUGM_RATIO, IMAGE_SIZE, DROP_RATE, N_LEA, DATA_PART, N_VAL
from methods import get_samples_and_labels, make_stratified_splits, make_lookup_table, load_and_preprocess_image, make_metric_data_frame, flatten_metric_data_frame
from models import make_augmentation
from sklearn.metrics import confusion_matrix, roc_curve, auc
import seaborn as sns
from seaborn import heatmap
from typing import List, Dict

print("number of training samples = ", N_LEA)
print("number of validation samples = ", N_VAL)
print("image size = ", IMAGE_SIZE)
print("augmentation factor = ", AUGM_RATIO)
print("dropout rate = ", DROP_RATE)
print("dataset partition = ", DATA_PART)

tf.get_logger().setLevel(logging.ERROR)

AssertionError: personal_dataset_path './datasets/sensors-2026-freehand/as-captured/apsz' does not exist

# Global environment initialization

In [None]:
CLASS_NAMES = []
NUM_CLASSES = []
NUM_SAMPLES = []
NUM_SAMPLES_PER_CLASS = []
SAMPLES_PER_CLASS_TRAIN = []
SAMPLES_PER_CLASS_VAL = []
SAMPLES_PER_CLASS_TEST = []
TRAIN_SIZE = [] 
VAL_SIZE = []
TEST_SIZE = []
CLASS_TO_IDX_TABLE: List[tf.lookup.StaticHashTable] = []
IDX_TO_CLASS_TABLE: List[tf.lookup.StaticHashTable] = []
train_datasets = []
val_datasets = []
test_datasets = []
casual_gestures_datasets = []

# DO NOT RUN MULTIPLE TIMES WITHOUT RESET

In [None]:
for experiment in experiments:
    X, y = get_samples_and_labels(experiment['personal_dataset_path'])
    X_cas, y_cas = get_samples_and_labels(experiment['casual_gestures_dataset_path'])
    X_cas = list(X_cas)
    y_cas = list(y_cas)

    CLASS_NAMES.append(sorted(set(y)))
    NUM_CLASSES.append(len(CLASS_NAMES[-1]))
    NUM_SAMPLES.append(len(X))
    NUM_SAMPLES_PER_CLASS.append(NUM_SAMPLES[-1] // NUM_CLASSES[-1])
    SAMPLES_PER_CLASS_TRAIN.append(experiment['samples_per_class_train'])
    SAMPLES_PER_CLASS_VAL.append(experiment['samples_per_class_val'])
    SAMPLES_PER_CLASS_TEST.append(NUM_SAMPLES_PER_CLASS[-1] - SAMPLES_PER_CLASS_TRAIN[-1] - SAMPLES_PER_CLASS_VAL[-1])
    TRAIN_SIZE.append(SAMPLES_PER_CLASS_TRAIN[-1] * NUM_CLASSES[-1])
    VAL_SIZE.append(SAMPLES_PER_CLASS_VAL[-1] * NUM_CLASSES[-1])
    TEST_SIZE.append(NUM_SAMPLES[-1] - TRAIN_SIZE[-1] - VAL_SIZE[-1])
    keys_to_vals, vals_to_keys = make_lookup_table(CLASS_NAMES[-1])
    CLASS_TO_IDX_TABLE.append(keys_to_vals)
    IDX_TO_CLASS_TABLE.append(vals_to_keys)

    assert TRAIN_SIZE[-1] + VAL_SIZE[-1] <= NUM_SAMPLES[-1], "Not enough samples for the specified train/val split."

    train_dataset, val_dataset, test_dataset = make_stratified_splits(X, y, VAL_SIZE[-1], TEST_SIZE[-1])
    train_datasets.append(train_dataset)
    val_datasets.append(val_dataset)
    test_datasets.append(test_dataset)
    casual_gestures_datasets.append(tf.data.Dataset.from_tensor_slices((X_cas, y_cas)))

# Models initialization - run following block to reset trained models

In [None]:
models: List[tf.keras.Sequential] = []
histories = []
test_results = []
y_pred_pobs = []
y_preds = []
y_csh_pred_pobs = []
y_csh_preds = []
y_true_classes = []
y_csh_true_classes_one_hot = []
y_true_classes_one_hot = []
for i, experiment in enumerate(experiments):
    models.append(experiment['model_gen'](experiment['input_shape'], NUM_CLASSES[i], experiment['learning_rate'], experiment['dropout_rate']))
    histories.append([])
    y_preds.append([])
    test_results.append([])
    y_pred_pobs.append([])
    y_csh_preds.append([])
    y_csh_pred_pobs.append([])

In [None]:
for i, experiment in enumerate(experiments):
    print(f"Starting experiment {i+1}/{len(experiments)}: {experiment['name']}")
    models[i].summary()
    print()

# Training loop - run following block to conduct the experiments

In [None]:
for i, experiment in enumerate(experiments):
    if TEST_SIZE[i] > 0:
        test_dataset = test_datasets[i].map(lambda path, label: (load_and_preprocess_image(path, label, CLASS_TO_IDX_TABLE[i], experiment['input_shape'], NUM_CLASSES[i])))
        casual_gestures_dataset = casual_gestures_datasets[i].map(lambda path, label: (load_and_preprocess_image(path, label, CLASS_TO_IDX_TABLE[i], experiment['input_shape'], NUM_CLASSES[i])))
        casual_gestures_dataset = casual_gestures_dataset.shuffle(1000).take(TEST_SIZE[i])
        casual_gestures_dataset = test_dataset.concatenate(casual_gestures_dataset)
        y_true_classes.append(tf.argmax([y for _, y in test_dataset], axis=1))
        y_true_classes_one_hot.append([y for _, y in test_dataset])
        y_csh_true_classes_one_hot.append([y for _, y in casual_gestures_dataset])
        test_dataset = test_dataset.batch(experiment['batch_size']).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
        casual_gestures_dataset = casual_gestures_dataset.batch(experiment['batch_size']).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

    for j in range(experiment['repetitions']):
        print(f"Starting experiment {i+1}/{len(experiments)}: {experiment['name']} repetition: {j+1}/{experiment['repetitions']}")

        model = tf.keras.models.clone_model(models[i])
        model.compile(optimizer='adam',
                loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

        augmentation = make_augmentation(experiment['rotation_factor'], experiment['zoom_factor'], experiment['translation_factor'])

        train_dataset = train_datasets[i].map(lambda path, label: (load_and_preprocess_image(path, label, CLASS_TO_IDX_TABLE[i], experiment['input_shape'], NUM_CLASSES[i])))
        train_dataset = train_dataset.batch(experiment['batch_size']).shuffle(reshuffle_each_iteration=True, buffer_size=train_dataset.cardinality()).cache().map(
            lambda x, y: (augmentation(x), y) , num_parallel_calls=tf.data.AUTOTUNE).prefetch(buffer_size=tf.data.AUTOTUNE
        )

        val_dataset = val_datasets[i].map(lambda path, label: (load_and_preprocess_image(path, label, CLASS_TO_IDX_TABLE[i], experiment['input_shape'], NUM_CLASSES[i])))
        val_dataset = val_dataset.batch(experiment['batch_size']).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

        histories[i].append(
            model.fit(
                train_dataset,
                validation_data=val_dataset,
                epochs=experiment['epochs'],
            )
        )

        if TEST_SIZE[i] > 0:
            restults = model.evaluate(test_dataset)
            test_results[i].append(restults)
            y_pred_pobs[i].append(model.predict(test_dataset))
            y_pred =tf.argmax(y_pred_pobs[i][-1], axis=1)
            y_preds[i].append(y_pred)
            y_csh_pred_pobs[i].append(model.predict(casual_gestures_dataset))

        print()

In [None]:
np_y_csh_true_classes_one_hot = np.array(y_csh_true_classes_one_hot)
np_y_csh_pred_pobs = []
for i in range(len(experiments)):
    np_y_csh_pred_pobs.append(np.array(y_csh_pred_pobs[i]))

tpr_all_experiments = np.zeros((len(experiments), 100))
roc_auc_all_experiments = np.zeros(len(experiments))
mean_fpr = np.linspace(0, 1, 100)

for i, experiment_histories in enumerate(histories):
    #EXPERIMENT INFO
    loss = make_metric_data_frame(experiments[i], experiment_histories, 'loss')
    val_loss = make_metric_data_frame(experiments[i], experiment_histories, 'val_loss')
    accuracy = make_metric_data_frame(experiments[i], experiment_histories, 'accuracy')
    val_accuracy = make_metric_data_frame(experiments[i], experiment_histories, 'val_accuracy')
    epochs = experiments[i]['epochs']
    repetitions = experiments[i]['repetitions']

    trainig_average_minimum_loss = np.average([np.min(loss['loss'][j:(j + 1)*epochs]) for j in range(repetitions)])
    validation_average_minimum_loss = np.average([np.min(val_loss['val_loss'][j:(j + 1)*epochs]) for j in range(repetitions)])
    trainig_average_maximum_accuracy = np.average([np.max(accuracy['accuracy'][j:(j + 1)*epochs]) for j in range(repetitions)])
    validation_average_maximum_accuracy = np.average([np.max(val_accuracy['val_accuracy'][j:(j + 1)*epochs]) for j in range(repetitions)])

    trainig_minimum_average_loss = np.min(loss.groupby(['epoch']).mean()['loss'])
    validation_minimum_average_loss = np.min(val_loss.groupby(['epoch']).mean()['val_loss'])
    trainig_maximum_average_accuracy = np.max(accuracy.groupby(['epoch']).mean()['accuracy'])
    validation_maximum_average_accuracy = np.max(val_accuracy.groupby(['epoch']).mean()['val_accuracy'])

    print(f"Experiment {i+1}: {experiments[i]['name']} | Dataset Path: {experiments[i]['personal_dataset_path']}")
    print()
    print(f"Class Names: {CLASS_NAMES[i]}")
    print(f"Number of Classes: {NUM_CLASSES[i]} |  Total Samples: {NUM_SAMPLES[i]} | Samples per Class: {NUM_SAMPLES_PER_CLASS[i]}")
    print(f"Training Samples: {TRAIN_SIZE[i]} ({SAMPLES_PER_CLASS_TRAIN[i]} per class) | Validation Samples: {VAL_SIZE[i]} ({SAMPLES_PER_CLASS_VAL[i]} per class) | Testing Samples: {TEST_SIZE[i]} ({SAMPLES_PER_CLASS_TEST[i]} per class)")
    print(f"Epochs: {experiments[i]['epochs']} | Repetitions: {experiments[i]['repetitions']} | Learning Rate: {experiments[i]['learning_rate']} | Input shape: {experiments[i]['input_shape']}")
    print(f"Dropout Rate: {experiments[i]['dropout_rate']} | Rotation Factor: {experiments[i]['rotation_factor']} | Zoom Factor: {experiments[i]['zoom_factor']} | Translation Factor: {experiments[i]['translation_factor']}")
    print()
    print("Training: average minimum loss:", trainig_average_minimum_loss, "| average maximum accuracy:", trainig_average_maximum_accuracy)
    print("Validation: average minimum loss:", validation_average_minimum_loss, "| average maximum accuracy:", validation_average_maximum_accuracy)
    print()
    print("Training: minimum average loss:", trainig_minimum_average_loss, "| maximum average accuracy:", trainig_maximum_average_accuracy)
    print("Validation: minimum average loss:", validation_minimum_average_loss, "| maximum average accuracy:", validation_maximum_average_accuracy)
    print()
    print("Test results: average loss", np.average([result[0] for result in test_results[i]]), "| average accuracy:", np.average([result[1] for result in test_results[i]]) if TEST_SIZE[i] > 0 else "No test set")

    #LOSS AND ACC PLOTS
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1).minorticks_on()
    sns.lineplot(data=loss, x='epoch', y='loss', label='training loss', linestyle=':')
    sns.lineplot(data=val_loss, x='epoch', y='val_loss', label='validation loss', color='red', linestyle='--')
    plt.title('Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss Value')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.axis(ymin=0, ymax=4, xmin=1, xmax=epochs)

    ax = plt.subplot(1, 2, 2)
    ax.set_yticks([tick / 10 for tick in range(11)])
    ax.minorticks_on()
    sns.lineplot(data=accuracy, x='epoch', y='accuracy', label='training accuracy', linestyle=':')
    sns.lineplot(data=val_accuracy, x='epoch', y='val_accuracy', label='validation accuracy', color='red', linestyle='--')
    plt.title('Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy Value')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.axis(ymin=0, ymax=1, xmin=1, xmax=epochs)

    plt.show()

    #ROC PLOTS
    tpr = np.zeros((repetitions, NUM_CLASSES[i],100))
    roc_auc = np.zeros((repetitions, NUM_CLASSES[i])) 

    for repetition in range(repetitions):
        for n in range(NUM_CLASSES[i]):
            fpr_tmp, tpr_tmp, _ = roc_curve(np_y_csh_true_classes_one_hot[i, :, n], np_y_csh_pred_pobs[i][repetition, :, n])
            tpr[repetition, n] = np.interp(mean_fpr, fpr_tmp, tpr_tmp)
            roc_auc[repetition, n] = auc(fpr_tmp, tpr_tmp)
            tpr[repetition, n, 0] = 0.0

    tpr_all_reps = np.mean(tpr, axis=1)
    roc_auc_all_reps = np.mean(roc_auc, axis=1)

    tpr_experiment = np.mean(tpr_all_reps, axis=0)
    roc_auc_experiment = np.mean(roc_auc_all_reps, axis=0)

    tpr_all_experiments[i] = tpr_experiment
    roc_auc_all_experiments[i] = roc_auc_experiment

    plt.figure()
    plt.grid(True, linestyle='--', alpha=0.5)
    plt.xlabel("FPR")
    plt.ylabel("TPR")
    plt.minorticks_on()
    plt.title("ROC")
    for repetition in range(repetitions):
        plt.plot(mean_fpr, tpr_all_reps[repetition], color="GREY", alpha=0.8, lw=0.8)
    plt.plot(mean_fpr, tpr_experiment, color="BLUE", label=f"macro-averge OvR (AUC = {roc_auc_experiment:.2f})")
    plt.plot([0, 1], [0, 1], linestyle='--', color="BLACK", label=f"Chance level (AUC = 0.5)")
    plt.legend()
    plt.show()
    print()

tpr_all_experiments_averged = np.mean(tpr_all_experiments, axis=0)
roc_auc_all_experiments_averged = np.mean(roc_auc_all_experiments, axis=0)

# Aggregated loss and accuracy plots

In [None]:
every_experiment_loss = []
every_experiment_val_loss = []
every_experiment_accuracy = []
every_experiment_val_accuracy = []
tests_loss_averages = []
tests_accuracy_averages = []
number_of_experiments = len(experiments)

for i, experiment_histories in enumerate(histories):
    every_experiment_loss.append(make_metric_data_frame(experiments[i], experiment_histories, 'loss'))
    every_experiment_val_loss.append(make_metric_data_frame(experiments[i], experiment_histories, 'val_loss'))
    every_experiment_accuracy.append(make_metric_data_frame(experiments[i], experiment_histories, 'accuracy'))
    every_experiment_val_accuracy.append(make_metric_data_frame(experiments[i], experiment_histories, 'val_accuracy'))

    if TEST_SIZE[i] > 0:
        tests_loss_averages.append(np.average([result[0] for result in test_results[i]]))
        tests_accuracy_averages.append(np.average([result[1] for result in test_results[i]]))


every_experiment_loss = flatten_metric_data_frame(every_experiment_loss, 'loss', number_of_experiments)
every_experiment_val_loss = flatten_metric_data_frame(every_experiment_val_loss, 'val_loss', number_of_experiments)
every_experiment_accuracy = flatten_metric_data_frame(every_experiment_accuracy, 'accuracy', number_of_experiments)
every_experiment_val_accuracy = flatten_metric_data_frame(every_experiment_val_accuracy, 'val_accuracy', number_of_experiments)
average_testes_loss = np.average(tests_loss_averages)
average_testes_accuracy = np.average(tests_accuracy_averages)

trainig_minimum_average_loss = np.min(every_experiment_loss.groupby(['epoch']).mean()['loss'])
validation_minimum_average_loss = np.min(every_experiment_val_loss.groupby(['epoch']).mean()['val_loss'])
trainig_maximum_average_accuracy = np.max(every_experiment_accuracy.groupby(['epoch']).mean()['accuracy'])
validation_maximum_average_accuracy = np.max(every_experiment_val_accuracy.groupby(['epoch']).mean()['val_accuracy'])

print("Training: minimum average loss:", trainig_minimum_average_loss, "| maximum average accuracy:", trainig_maximum_average_accuracy)
print("Validation: minimum average loss:", validation_minimum_average_loss, "| maximum average accuracy:", validation_maximum_average_accuracy)
print("Test results: average loss", average_testes_loss, "| average accuracy:", average_testes_accuracy if TEST_SIZE[i] > 0 else "No test set")
print()

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1).minorticks_on()
sns.lineplot(data=every_experiment_loss, x='epoch', y='loss', label='training loss', linestyle=':')
sns.lineplot(data=every_experiment_val_loss, x='epoch', y='val_loss', label='validation loss', color='red', linestyle='--')
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.axis(ymin=0, ymax=4, xmin=1, xmax=epochs)

ax = plt.subplot(1, 2, 2)
ax.set_yticks([tick / 10 for tick in range(11)])
ax.minorticks_on()
sns.lineplot(data=every_experiment_accuracy, x='epoch', y='accuracy', label='training accuracy', linestyle=':')
sns.lineplot(data=every_experiment_val_accuracy, x='epoch', y='val_accuracy', label='validation accuracy', color='red', linestyle='--')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy Value')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.axis(ymin=0, ymax=1, xmin=1, xmax=epochs)

plt.show()

plt.figure()
plt.grid(True, linestyle='--', alpha=0.5)
plt.xlabel("FPR")
plt.ylabel("TPR")
plt.minorticks_on()
plt.title("ROC")
for experiment in range(len(experiments)):
    plt.plot(mean_fpr, tpr_all_experiments[experiment], color="GREY", alpha=0.8, lw=0.8)
plt.plot(mean_fpr, tpr_all_experiments_averged, color="BLUE", label=f"macro-averge OvR (AUC = {roc_auc_all_experiments_averged:.2f})")
plt.plot([0, 1], [0, 1], linestyle='--', color="BLACK", label=f"Chance level (AUC = 0.5)")
plt.legend()
plt.show()
print()

# Confusion matrices

In [None]:
for i, experiment in enumerate(experiments):
    representatives = {y.numpy().decode('utf-8'): x.numpy().decode('utf-8') for x, y in test_datasets[i]}
    combined_confusion_matrix = np.zeros((NUM_CLASSES[i], NUM_CLASSES[i]), dtype=np.int16)
    for repetition in range(experiment['repetitions']):
        combined_confusion_matrix += confusion_matrix(y_true_classes[i].numpy(), y_preds[i][repetition].numpy())
    labels = list(map(lambda x: x.decode('utf-8'), IDX_TO_CLASS_TABLE[i].lookup(tf.constant([j for j in range(NUM_CLASSES[i])], dtype=tf.int64)).numpy()))
    images = [mpimg.imread(representatives[label]) for label in labels]

    diagonal = combined_confusion_matrix.diagonal()
    trues = diagonal.sum()
    all = combined_confusion_matrix.sum()
    falses = all - trues
    accuracy = trues / all
    precision = np.mean([diagonal[j] / combined_confusion_matrix[:, j].sum() for j in range(NUM_CLASSES[i])])
    recall = np.mean([diagonal[j] / combined_confusion_matrix[j].sum() for j in range(NUM_CLASSES[i])])

    print(f'Accuracy: {accuracy} | Precision: {precision} | Recall: {recall}')

    fig = plt.figure(figsize=(20, 7))
    main_grid = fig.add_gridspec(1, 2)
    heatmap_axis = fig.add_subplot(main_grid[0, 0])

    hm = heatmap(combined_confusion_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
    hm.set_ylabel('True Label', fontdict={'fontsize':14}, labelpad=12)
    hm.set_xlabel('Predicted Label', fontdict={'fontsize':14}, labelpad=12)
    heatmap_axis.set_title('Confusion matrix')
    heatmap_axis.set_xlabel('Predicted Label')
    heatmap_axis.set_ylabel('True Label')
    heatmap_axis.set_xticklabels(labels, rotation=45)

    images_container_axis = fig.add_subplot(main_grid[0, 1])
    images_container_axis.axis('off')
    images_container_axis.set_title('Class representatives', pad=20)
    images_grid = GridSpecFromSubplotSpec(3, 5, subplot_spec=main_grid[0, 1])
    for j in range(len(labels)):
        image_axis = fig.add_subplot(images_grid[j])
        image_axis.imshow(images[j])
        image_axis.set_title(labels[j], fontsize=8)
        image_axis.axis('off')

    fig.suptitle(f'Experiment {i+1}: {experiment["name"]}', fontsize=20, y=1.02)

    plt.show()


# Following block is used to save choosen experiments metrics

In [None]:
experiments_to_save = [] # Indices of experiments to save metrics for

for experiment in experiments_to_save:
    experiment_time = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
    name = experiments[experiment].get('name', None)
    if name != None:
        name = f"{experiments[experiment]['name']}_{experiment_time}"
    else:
        name = f"{experiment_time}"
    
    os.makedirs(f"./metrics/{name}", exist_ok=True)
    np.savetxt(f"./metrics/{name}/loss.csv", histories[experiment].history['loss'], delimiter=",")
    np.savetxt(f"./metrics/{name}/val_loss.csv", histories[experiment].history['val_loss'], delimiter=",")
    np.savetxt(f"./metrics/{name}/accuracy.csv", histories[experiment].history['accuracy'], delimiter=",")
    np.savetxt(f"./metrics/{name}/val_accuracy.csv", histories[experiment].history['val_accuracy'], delimiter=",")
    if TEST_SIZE[experiment] > 0:
        np.savetxt(f"./metrics/{name}/test_results.csv", test_results[experiment], delimiter=",")
    
    time.sleep(1)  # Ensure unique timestamps for directory names