In [42]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import (
    Conv1D, MaxPooling1D, GlobalAveragePooling1D,
    Dense, Dropout, BatchNormalization,Input, Concatenate
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
import pandas as pd

In [4]:
# Read datasets from the interim directory
X_train = pd.read_pickle("../../data/interim/X_train.pkl")
X_val = pd.read_pickle("../../data/interim/X_val.pkl")
X_test = pd.read_pickle("../../data/interim/X_test.pkl")
y_train = pd.read_pickle("../../data/interim/y_train.pkl")
y_val = pd.read_pickle("../../data/interim/y_val.pkl")
y_test = pd.read_pickle("../../data/interim/y_test.pkl")

In [6]:
# 1) Windowing Function and Parameters

sensor_cols = ['acc_x', 'acc_y', 'acc_z', 'gyro_x', 'gyro_y', 'gyro_z']
WINDOW_SIZE = 50
STEP_SIZE = 40

def create_windows(X, y, window_size=WINDOW_SIZE, step_size=STEP_SIZE):
    X_windows, y_windows = [], []
    for start in range(0, len(X) - window_size + 1, step_size):
        end = start + window_size
        window_data = X[start:end]
        window_labels = y[start:end]
        # Majority voting for label assignment
        unique, counts = np.unique(window_labels, return_counts=True)
        label = unique[np.argmax(counts)]
        X_windows.append(window_data)
        y_windows.append(label)
    return np.array(X_windows), np.array(y_windows)


### Data Preperation

In [38]:
# Create windows
X_train_win, y_train_win = create_windows(X_train, y_train)
X_val_win,   y_val_win   = create_windows(X_val,   y_val)
X_test_win,  y_test_win  = create_windows(X_test,  y_test)

from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
y_train_enc = le.fit_transform(y_train_win)
y_val_enc   = le.transform(y_val_win)
y_test_enc  = le.transform(y_test_win)
NUM_CLASSES = len(le.classes_)

In [39]:
# Split channels
X_train_acc  = X_train_win[..., :3]
X_train_gyro = X_train_win[..., 3:]
X_val_acc    = X_val_win[..., :3]
X_val_gyro   = X_val_win[..., 3:]
X_test_acc   = X_test_win[..., :3]
X_test_gyro  = X_test_win[..., 3:]

In [40]:
def build_dual_encoder_model(num_classes, input_shape_accel, input_shape_gyro):
    # Accel branch
    input_accel = Input(shape=input_shape_accel)
    x = Conv1D(64, 3, padding="same", activation="relu")(input_accel)
    x = MaxPooling1D(2)(x)
    x = Conv1D(64, 3, padding="same", activation="relu")(x)
    x = MaxPooling1D(2)(x)
    x = Conv1D(64, 3, padding="same", activation="relu")(x)
    x = GlobalAveragePooling1D()(x)
    
    # Gyro branch
    input_gyro = Input(shape=input_shape_gyro)
    y = Conv1D(64, 3, padding="same", activation="relu")(input_gyro)
    y = MaxPooling1D(2)(y)
    y = Conv1D(64, 3, padding="same", activation="relu")(y)
    y = MaxPooling1D(2)(y)
    y = Conv1D(64, 3, padding="same", activation="relu")(y)
    y = GlobalAveragePooling1D()(y)

    # Merge both branches
    joint = Concatenate()([x, y])
    h = BatchNormalization()(joint)
    h = Dropout(0.5)(h)
    h = Dense(256, activation="relu")(h)
    h = BatchNormalization()(h)
    h = Dropout(0.5)(h)
    output = Dense(num_classes, activation="softmax")(h)

    # Final model
    model = Model(inputs=[input_accel, input_gyro], outputs=output)
    return model

In [None]:
# Now just like your example:
input_shape_accel = (X_train_acc.shape[1], X_train_acc.shape[2])  # (50, 3)
input_shape_gyro = (X_train_gyro.shape[1], X_train_gyro.shape[2])  # (50, 3)
num_classes = NUM_CLASSES  # 16

model = build_dual_encoder_model(num_classes, input_shape_accel, input_shape_gyro)


In [None]:
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()

In [46]:
history = model.fit(
    (X_train_acc, X_train_gyro), y_train_enc,
    validation_data=((X_val_acc, X_val_gyro), y_val_enc),
    epochs=30,
    batch_size=64
)

Epoch 1/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 7ms/step - accuracy: 0.7207 - loss: 0.9980 - val_accuracy: 0.7918 - val_loss: 0.6212
Epoch 2/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7ms/step - accuracy: 0.8369 - loss: 0.4986 - val_accuracy: 0.8093 - val_loss: 0.5293
Epoch 3/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7ms/step - accuracy: 0.8567 - loss: 0.4401 - val_accuracy: 0.8166 - val_loss: 0.4867
Epoch 4/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 7ms/step - accuracy: 0.8649 - loss: 0.4048 - val_accuracy: 0.8207 - val_loss: 0.4938
Epoch 5/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - accuracy: 0.8743 - loss: 0.3837 - val_accuracy: 0.8125 - val_loss: 0.4960
Epoch 6/30
[1m1087/1087[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - accuracy: 0.8794 - loss: 0.3664 - val_accuracy: 0.8205 - val_loss: 0.4851
Epoch 7/30
[1m

In [48]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (confusion_matrix, classification_report, 
                             precision_score, recall_score, f1_score)

def evaluate_model(model, X_test, y_test, label_classes=None, plot_confusion_matrix=True, plot_roc=True):

    # Evaluate the model for loss and accuracy
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print("Test Loss: {:.4f}".format(loss))
    print("Test Accuracy: {:.4f}".format(accuracy))

    # Generate prediction probabilities and predicted classes
    y_pred_prob = model.predict(X_test)
    y_pred = np.argmax(y_pred_prob, axis=1)

    # Compute confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    if plot_confusion_matrix:
        plt.figure(figsize=(10, 8))
        if label_classes is None:
            label_classes = [str(i) for i in range(cm.shape[0])]
        sns.heatmap(cm, annot=True, fmt="d", cmap='Blues', 
                    xticklabels=label_classes, yticklabels=label_classes)
        plt.xlabel("Predicted Label")
        plt.ylabel("True Label")
        plt.title("Confusion Matrix")
        plt.show()

    # Compute and print classification report
    report = classification_report(y_test, y_pred, target_names=label_classes)
    print("Classification Report:\n", report)

    # Compute weighted precision, recall, and F1 scores
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    print("Weighted Precision: {:.4f}".format(precision))
    print("Weighted Recall: {:.4f}".format(recall))
    print("Weighted F1 Score: {:.4f}".format(f1))

    metrics = {
        'loss': loss,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1
    }

    return metrics