Import the libraries

In [1]:
import pickle
import numpy as np
import random
import matplotlib.pyplot as plt
import tensorflow as tf
from scikeras.wrappers import KerasClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Input
from tensorflow.keras.activations import softmax
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import StratifiedKFold
from itertools import product 
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten

## PREPARING THE INPUT

[point 2 in the exam text: \
*For simplicity, but not necessary) and to make my data more understandable (to me), i would start transforming my vector in a matrix 33x4, where the rows correspond to the 33 sidepoints, and the columns are x, y, z, v. I am going to apply a minmax scaling: considering (0.493, 0.631) for the X column, and so on... \
The classes are more or less balanced, so for the moment I would not apply any upsampling/downsampling for the minority/majority classes. I could try it only if the performance of my model will not satisfy me. \
I will also transform my outputs from strings to integers (from 0 to 6).*]

First, we load the data and we look at their shape

In [2]:
file_path = r'C:\Users\yoshi\OneDrive\Desktop\exam deep learning\input_data.pkl'
with open(file_path, "rb") as file:
    data = pickle.load(file)

Let's look at the first sample

In [3]:
sample_key = list(data.keys())[0]
print(f"Sample data for key '{sample_key}':", data[sample_key])

Sample data for key 'X': [[ 0.57770568  0.23191446 -0.4204722  ...  0.95409644  0.14137968
   0.97238696]
 [ 0.57770413  0.23211703 -0.4230628  ...  0.95341611  0.14557168
   0.97199601]
 [ 0.5777995   0.2321346  -0.42221451 ...  0.95276618  0.1410574
   0.97163814]
 ...
 [ 0.61882263  0.23206101 -0.39901862 ...  0.96600813  0.04200708
   0.91995174]
 [ 0.61883748  0.23205066 -0.40318117 ...  0.96604252  0.04207094
   0.91996664]
 [ 0.61884356  0.23205319 -0.40282306 ...  0.96613091  0.03997473
   0.92123002]]


And divide the sample from theit classification output

In [4]:
X = np.array(data['X'])
y = np.array(data['y'])

print("Shape of X:", X.shape)
print("Shape of y:", y.shape) 

Shape of X: (2700, 132)
Shape of y: (2700,)


Now we can transform from vector to matrix

In [5]:
X_reshaped = X.reshape(-1, 33, 4)

print("Shape of reshaped X:", X_reshaped.shape)

Shape of reshaped X: (2700, 33, 4)


And check the first sample to see how it looks now:

In [6]:
print("First reshaped sample (33x4 matrix):")
print(X_reshaped[0])

First reshaped sample (33x4 matrix):
[[ 0.57770568  0.23191446 -0.4204722   0.9999938 ]
 [ 0.58672541  0.2164979  -0.40176901  0.99997592]
 [ 0.59186375  0.2168406  -0.40179992  0.99997413]
 [ 0.5957647   0.21739218 -0.40178603  0.99997211]
 [ 0.57161874  0.21834612 -0.40327197  0.9999702 ]
 [ 0.56672728  0.21924257 -0.40329617  0.99996722]
 [ 0.56239396  0.22022381 -0.40333375  0.99996448]
 [ 0.60183752  0.22745022 -0.2632179   0.9999491 ]
 [ 0.55938202  0.22777343 -0.27078733  0.99993861]
 [ 0.58797812  0.25179648 -0.3664923   0.99999428]
 [ 0.56935519  0.25208199 -0.36876363  0.99999261]
 [ 0.63961399  0.33558801 -0.15272155  0.99998927]
 [ 0.52481461  0.33422393 -0.18341279  0.9999733 ]
 [ 0.64599973  0.46432573 -0.08530901  0.9896456 ]
 [ 0.51587093  0.46985579 -0.13175203  0.98091239]
 [ 0.64039892  0.58820516 -0.21137002  0.98633075]
 [ 0.50740278  0.58851296 -0.24127439  0.96922094]
 [ 0.63988477  0.62180054 -0.24995236  0.97064441]
 [ 0.5041073   0.62379986 -0.28097337  0.9472

Let's apply minmax scaling for the columns

In [7]:
# initialize minmax scaler for each coorinate + visibility
scalers = [MinMaxScaler() for _ in range(4)]

# create an empty array to store the scaled data
X_scaled = np.zeros_like(X_reshaped)

# apply minmax scaler independently to each column
for i in range(4):
    feature_data = X_reshaped[:, i, :] # identify the specific column
    X_scaled[:, i, :] = scalers[i].fit_transform(feature_data)

# check the shape
print("Scaled data shape:", X_scaled.shape)

Scaled data shape: (2700, 33, 4)


Now, we consider the outputs. First, we check how they look like

In [8]:
unique_classes, counts = np.unique(y, return_counts=True)

print("Unique classes and their counts:")
for class_label, count in zip(unique_classes, counts):
    print(f"{class_label}: {count}")

Unique classes and their counts:
left_bicep: 435
left_shoulder: 373
left_tricep: 317
rest: 406
right_bicep: 369
right_shoulder: 401
right_tricep: 399


Using a labelencoder, we transform the strings to integers

In [9]:
label_encoder = LabelEncoder()

y_encoded = label_encoder.fit_transform(y)

And we print the conversion, to keep track

In [10]:
label_mapping = {original: encoded for original, encoded in zip(label_encoder.classes_, range(len(label_encoder.classes_)))}
print("\nClass to Encoded Mapping:")
for original_label, encoded_label in label_mapping.items():
    print(f"{original_label} -> {encoded_label}")
print()

# check if it worked (confront to the print before)
unique_classes, counts = np.unique(y_encoded, return_counts=True)
print("Unique classes and their counts:")
for class_label, count in zip(unique_classes, counts):
    print(f"{class_label}: {count}")


Class to Encoded Mapping:
left_bicep -> 0
left_shoulder -> 1
left_tricep -> 2
rest -> 3
right_bicep -> 4
right_shoulder -> 5
right_tricep -> 6

Unique classes and their counts:
0: 435
1: 373
2: 317
3: 406
4: 369
5: 401
6: 399


In [11]:
# rename to make order

final_x = X_scaled
final_y = y_encoded

## CREATE THE MODEL

[point 1 in the exam text: \
\
*For this kind of task, I would use a FCN, because it works good for tabular data (like mine), and it is able to capture both the local and the global features. It also allows me to use fully connected layers, that are exactly what I need in my task*]

[point 5 in the exam text: \
\
*MODEL COMPOSITION* \
*input (preprocessed) -> fully connected layer -> dropout -> fully connected layer -> dropout -> output* \
\
*ACTIVATION FUNCTIONS* \
*ReLu inside hidden layers* \
*softmax for output layer* \
\
*HYPERPARAMETERS* \
*I would try different numbers of hidden layers, learning rate and dropout rate.* \
*Also, I would start with a certain number of epochs and check the permormance during the epochs. If the performance is getting worse after a certain number of epochs, I reduce it.* ]

[point 3 in the exam: \
\
*The output is going to be a fully connected layer with softmax activation that returns a vector of length 7 (corresponding to the classes). This is because, doing that, I have all my values corresponding to a probability distribution (they sum up to 1). The biggest value position corresponds to the predicted class.* ]

[point 4 in the exam: \
\
*For this task, I am going to use categorical cross-entropy. Since I am dealing with multi-class classification, I need my error to be evaluated as the "distance" of my predicted class to the true class of the sample. The farest the prediction is from the reality, the bigger the error calculated.* ]

<font color='red'>**CHANGE FROM EXAM TEXT**</font> \
I had to add a flatten layer that I forgot to write in the exam text. Since I reshaped my data in the preprocessing, and transformed them from vector (132,) to matrix 33x4, now (after the scaling) I need to go back to the vector to feed them in the fcn.

Let's define the model:

In [12]:
def create_model(input_shape=(33, 4), num_classes=7, nhid1=128, nhid2=64, learning_rate=0.001, dropout_rate=0.3, hid_act='relu', loss='sparse_categorical_crossentropy'):
    model = Sequential()

    # input layer
    model.add(Input(shape=input_shape))
    
    # flatten layer
    model.add(Flatten())
    
    # first hidden layer
    model.add(Dense(nhid1, activation=hid_act))
    model.add(Dropout(dropout_rate))  # regularization
    
    # second hidden layer
    model.add(Dense(nhid2, activation=hid_act))
    model.add(Dropout(dropout_rate))  # regularization
    
    # output layer
    model.add(Dense(num_classes, activation='softmax'))
    
    # compile
    model.compile(  
        loss=loss, 
        metrics=['f1_score']
    )

    return model

model = create_model()

# print the model architecture
model.summary()

**Hyperparametrization**

Define the hyperparameters:

In [13]:
param_grid = {
    'model__nhid1': [75, 100, 125],
    'model__nhid2': [50, 75],
    'model__learning_rate': [0.2, 0.1],
    'model__dropout_rate': [0, 0.2],
}

[point 6 in the exam: \
*First, I would split my data between train and test (80%, 20% seems a reasonable number)* \
*[...]* \
*I would evaluate the performance with F1.* ]

In [14]:
X_train, X_test, y_train, y_test = train_test_split(
    final_x, final_y, test_size=0.2, random_state=42, stratify=y)

To find the best hyperparameters, I firs generate all the possible combinations.

In [15]:
keys, values = zip(*param_grid.items())
param_combinations = [dict(zip(keys, v)) for v in product(*values)]

And I start doing the hyperparameters tuning

In [19]:
%%time

# keep track of the results
best_params = None
best_f1 = 0

# loop through each hyperparameter combination
for params in param_combinations:
    print(f"Testing parameters: {params}")
    
    # create and train the model with current hyperparameters
    model = KerasClassifier(
        model=create_model,
        model__input_shape=(33, 4),
        model__num_classes=7,
        model__nhid1=params['model__nhid1'],
        model__nhid2=params['model__nhid2'],
        model__learning_rate=params['model__learning_rate'],
        model__dropout_rate=params['model__dropout_rate'],
        batch_size=32,
        epochs=5,  # PUT TO 50
        verbose=0
    )
    
    # Train the model
    model.fit(X_train, y_train)
    
    # Predict on test data
    pred = model.predict(X_test)
    
    # evaluate model performance
    f1 = f1_score(y_test, pred, average='weighted')
    print(f"F1 Score: {f1}")
    
    # Store best hyperparameters
    if f1 > best_f1:
        best_f1 = f1
        best_params = params

Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.0001, 'model__dropout_rate': 0}
F1 Score: 0.791670099668113
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.0001, 'model__dropout_rate': 0.2}
F1 Score: 0.8340249228724211
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.0001, 'model__dropout_rate': 0.4}
F1 Score: 0.7111629405025985
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.001, 'model__dropout_rate': 0}
F1 Score: 0.864602585646317
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.001, 'model__dropout_rate': 0.2}
F1 Score: 0.7703806862627657
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.001, 'model__dropout_rate': 0.4}
F1 Score: 0.8002154131553718
Testing parameters: {'model__nhid1': 75, 'model__nhid2': 50, 'model__learning_rate': 0.01, 'model__dropout_ra

In [17]:
print("\n==================== Best Results ====================")
print(f"Best Hyperparameters:")
print(f" - Number of hidden units in Layer 1: {best_params['model__nhid1']}")
print(f" - Number of hidden units in Layer 2: {best_params['model__nhid2']}")
print(f" - Learning Rate: {best_params['model__learning_rate']}")
print(f" - Dropout Rate: {best_params['model__dropout_rate']}")
print("\nPerformance Metrics:")
print(f" - Best F1 Score: {best_f1:.4f}")
print("========================================================\n")


Best Hyperparameters:
 - Number of hidden units in Layer 1: 150
 - Number of hidden units in Layer 2: 75
 - Learning Rate: 0.001
 - Dropout Rate: 0.4

Performance Metrics:
 - Best F1 Score: 0.9075



In [18]:
#save the best parameter

final_params = best_params.copy()

## MODEL EVALUATION

[point 6 in the exam: \
*Within the train set, I can divide again using validation sets, and I can do that with a cross validation procedure.* \
*Then, after the source for the best hyperparameters configuration, I would evaluate the performance on the test set of the best configuration, using f1.* ]

First, we define the strategy:

In [19]:
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [20]:
# to store the results
cv_f1_scores = []

And I create the model with the best hyperparameters that I found before.

In [21]:
model = KerasClassifier(
    model=create_model,
    model__input_shape=(33, 4),
    model__num_classes=7,
    **final_params,
    batch_size=32,
    epochs=5,
    verbose=0
)

Let's perform cross-validation:

In [22]:
for train_idx, val_idx in cv.split(X_train, y_train):
    # split into training and validation
    X_train_fold, X_val_fold = X_train[train_idx], X_train[val_idx]
    y_train_fold, y_val_fold = y_train[train_idx], y_train[val_idx]

    # train the model
    model.fit(X_train_fold, y_train_fold)

    # predict on the validation fold
    pred = model.predict(X_val_fold)

    # evaluate model performance
    f1 = f1_score(y_val_fold, pred, average='weighted')

    # store the results
    cv_f1_scores.append(f1)

Let's print the results we obtained:

In [23]:
print("\n--- Cross-Validation F1 Scores ---")
print(f"{'Fold':<6} {'F1 Score':<15}")
print("-" * 30)

for fold, f1_score in enumerate(cv_f1_scores, 1):
    print(f"{fold:<6} {f1_score:.4f}")

mean_f1 = np.mean(cv_f1_scores)
std_f1 = np.std(cv_f1_scores)

print("\n--- Cross-Validation Summary ---")
print(f"Mean F1 Score: {mean_f1:.4f}")
print(f"Standard Deviation: {std_f1:.4f}")


--- Cross-Validation F1 Scores ---
Fold   F1 Score       
------------------------------
1      0.7302
2      0.7653
3      0.8389
4      0.7616
5      0.7707

--- Cross-Validation Summary ---
Mean F1 Score: 0.7733
Standard Deviation: 0.0357


## CONSIDERATIONS

My F1 score dropped during cross-validation. This could be due to the model being trained on smaller, potentially less representative subsets of the data in each fold, which can result in slightly lower performance compared to training on the full dataset.

Dince the main focus of this project was not on achieving optimal performance, I did not analyze the performance across epochs to determine whether reducing the number of epochs would have been beneficial.