# **PDIoT Coursework 3: Daily Physical Activity Classification**

This notebook implements machine learning models to classify 11 daily physical activities using RESpeck accelerometer data:

**Activities to Classify:**
1. Ascending
2. Decending
3. Lying down Back
4. Lying down Left
5. Lying down Right
6. Lying down Stomach
7. Miscellaneous movements
8. Normal walking
9. Running
10. Shuffle walking
11. Sitting/Standing


## 1. Imports

In [None]:
#all imports required
import pandas as pd
import numpy as np
import glob
import os
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import shutil
import zipfile

from sklearn.model_selection import train_test_split, LeaveOneGroupOut
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
from scipy.fft import fft
from scipy.signal import find_peaks
from sklearn.metrics import precision_recall_fscore_support

import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (Input, Conv1D, MaxPooling1D, GlobalAveragePooling1D,
                                      Flatten, Dense, Dropout, BatchNormalization,
                                      LSTM, concatenate)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import plot_model

np.random.seed(42)
tf.random.set_seed(42)

In [None]:
#extract the respeck dataset zip file into the local data directory
data_zip = "data/RESpeckData_Updated.zip"
with zipfile.ZipFile(data_zip, 'r') as zip_ref:
    zip_ref.extractall("data/")

#define the path to the daily physical activity dataset
dataset_path = "data/RESpeckData_Updated/daily_activity/"

#checking to confirm dataset path and available activity folders
print(f"Dataset path: {dataset_path}")
print(f"Folders in dataset: {os.listdir(dataset_path)}")

NameError: name 'zipfile' is not defined

## 2. Data Loading

In [None]:
#define a fixed mapping from activity names to integer class labels
activity_mapping = {
    'sittingStanding': 0,
    'lyingLeft': 1,
    'lyingRight': 2,
    'lyingBack': 3,
    'lyingStomach': 4,
    'normalWalking': 5,
    'running': 6,
    'ascending': 7,
    'descending': 8,
    'shuffleWalking': 9,
    'miscMovement': 10
}

#define static activity classes used for performance evaluation
static_activities = [0, 1, 2, 3, 4]

#define dynamic activity classes used for performance evaluation
dynamic_activities = [5, 6, 7, 8, 9, 10]

#print a summary of all activity classes and their static or dynamic grouping
print("Activity Classes (11 total):")
for activity, label in activity_mapping.items():
    activity_type = "Static" if label in static_activities else "Dynamic"
    print(f"{label:2d}: {activity:20s} ({activity_type})")


In [None]:
def extract_enhanced_features(window_data):
    features = []

    #1.gravity-based orientation features estimated via window-mean acceleration
    mean_x = np.mean(window_data[:, 0])
    mean_y = np.mean(window_data[:, 1])
    mean_z = np.mean(window_data[:, 2])

    #normalise mean acceleration to approximate gravity direction
    magnitude = np.sqrt(mean_x**2 + mean_y**2 + mean_z**2)
    gravity_x = mean_x / magnitude if magnitude > 0 else 0
    gravity_y = mean_y / magnitude if magnitude > 0 else 0
    gravity_z = mean_z / magnitude if magnitude > 0 else 0

    #compute pitch and roll angles for posture discrimination
    pitch = np.arctan2(mean_y, np.sqrt(mean_x**2 + mean_z**2))
    roll = np.arctan2(mean_x, mean_z)

    features.extend([gravity_x, gravity_y, gravity_z, pitch, roll])

    #2.time-domain statistical features capturing amplitude and variability per axis
    for axis in range(3):
        axis_data = window_data[:, axis]
        features.extend([
            np.mean(axis_data),                  #average acceleration level
            np.std(axis_data),                   #signal dispersion
            np.var(axis_data),                   #power-related variability
            np.min(axis_data),                   #lower amplitude bound
            np.max(axis_data),                   #upper amplitude bound
            np.percentile(axis_data, 25),        #lower quartile
            np.percentile(axis_data, 75),        #upper quartile
            np.max(axis_data) - np.min(axis_data),  #dynamic range
            np.mean(np.abs(np.diff(axis_data)))  #temporal smoothness measure
        ])

    #3.frequency-domain features capturing periodic locomotion patterns
    for axis in range(3):
        axis_data = window_data[:, axis]
        fft_vals = np.abs(fft(axis_data))
        fft_vals = fft_vals[:len(fft_vals)//2]  #only positive frequencies

        #compute magnitude of fft and retain positive frequencies
        dominant_freq = np.argmax(fft_vals)

        #energy in frequency bands
        #low band: 0-5 Hz, Mid band: 5-15 Hz, High band: 15-25 Hz
        #sampling rate is 12.5Hz, so Nyquist is 6.25Hz
        #adjust bands: Low: 0-2Hz, Mid: 2-4Hz, High: 4-6.25Hz
        total_energy = np.sum(fft_vals)
        low_band = np.sum(fft_vals[0:int(len(fft_vals)*0.32)])  # 0-2Hz
        mid_band = np.sum(fft_vals[int(len(fft_vals)*0.32):int(len(fft_vals)*0.64)])  # 2-4Hz
        high_band = np.sum(fft_vals[int(len(fft_vals)*0.64):])  # 4-6.25Hz

        features.extend([
            dominant_freq,
            low_band / (total_energy + 1e-10),
            mid_band / (total_energy + 1e-10),
            high_band / (total_energy + 1e-10)
        ])

    #4.step-related features derived from acceleration magnitude peaks
    accel_mag = np.sqrt(window_data[:, 0]**2 + window_data[:, 1]**2 + window_data[:, 2]**2)

    #detect steps using prominence-based peak detection
    peaks, _ = find_peaks(accel_mag, distance=5, prominence=0.2)
    num_steps = len(peaks)
    step_rate = num_steps / len(window_data)

    #quantify gait regularity via inter-step interval variability
    if len(peaks) > 1:
        inter_peak_intervals = np.diff(peaks)
        step_regularity = np.std(inter_peak_intervals)
    else:
        step_regularity = 0

    features.extend([num_steps, step_rate, step_regularity])

    return np.array(features)

In [None]:
def prepare_data_with_features(X):
    additional_features = []
    #extract features for each windowed sample
    for window in X:
        feats = extract_enhanced_features(window)
        additional_features.append(feats)

    #convert list of per-window feature vectors into array for model input
    return np.array(additional_features)

In [None]:
def compute_variance_threshold(X_train, y_train, static_activities, dynamic_activities, sample_size=5000):
    #subsample training windows to reduce computation on large datasets
    if len(X_train) > sample_size:
        indices = np.random.choice(len(X_train), sample_size, replace=False)
        X_sample = X_train[indices]
        y_sample = y_train[indices]
    else:
        X_sample = X_train
        y_sample = y_train

    #estimate body acceleration variance for each window
    variances = []
    for window in X_sample:
        #compute acceleration magnitude across axes
        accel_mag = np.sqrt(
            window[:, 0]**2 +
            window[:, 1]**2 +
            window[:, 2]**2
        )

        #remove constant gravity offset to emphasise dynamic motion
        body_accel = accel_mag - 9.81

        #use variance as a motion intensity proxy
        var = np.var(body_accel)
        variances.append(var)

    variances = np.array(variances)

    #separate variance distributions for static and dynamic activities
    static_mask = np.isin(y_sample, static_activities)
    dynamic_mask = np.isin(y_sample, dynamic_activities)

    static_variances = variances[static_mask]
    dynamic_variances = variances[dynamic_mask]

    #derive conservative threshold from extreme percentiles
    max_static = np.percentile(static_variances, 95)
    min_dynamic = np.percentile(dynamic_variances, 5)
    threshold = (max_static + min_dynamic) / 2

    #report statistics for transparency and analysis
    print(f"Static variance 95th percentile: {max_static:.6f}")
    print(f"Dynamic variance 5th percentile: {min_dynamic:.6f}")
    print(f"Computed threshold: {threshold:.6f}")

    return threshold

In [None]:
def apply_variance_filtering(y_pred_probs, X_test, variance_threshold,
                            static_activities, dynamic_activities,
                            confidence_threshold=0.95):
    y_pred = []

    #post-process predictions window by window using motion variance
    for i, (probs, window) in enumerate(zip(y_pred_probs, X_test)):

        #compute body acceleration variance as a motion intensity measure
        accel_mag = np.sqrt(
            window[:, 0]**2 +
            window[:, 1]**2 +
            window[:, 2]**2
        )
        body_accel = accel_mag - 9.81
        variance = np.var(body_accel)

        #extract model confidence and most likely class
        max_prob = np.max(probs)
        model_pred = np.argmax(probs)

        #trust model prediction when confidence exceeds threshold
        if max_prob > confidence_threshold:
            final_pred = model_pred
        else:
            #constrain prediction based on variance-derived activity type
            if variance < variance_threshold:
                #restrict classification to static activity subset
                static_probs = probs[static_activities]
                final_pred = static_activities[np.argmax(static_probs)]
            else:
                #restrict classification to dynamic activity subset
                dynamic_probs = probs[dynamic_activities]
                final_pred = dynamic_activities[np.argmax(dynamic_probs)]

        y_pred.append(final_pred)

    return np.array(y_pred)

#  3. Data Processing and Preparation



In [None]:
#define sliding window length and overlap for time-series segmentation
window_size = 50
step_size = 25

#initialise lists to store windowed data, labels, and subject identifiers
X_all = []
y_all = []
subjects_all = []

#iterate over each activity folder and corresponding class label
for activity, label in activity_mapping.items():
    folder = os.path.join(dataset_path, activity)
    for file in glob.glob(os.path.join(folder, "*.csv")):
        try:
            #extract subject identifier from filename for subject-wise splitting
            subject = file.split('_')[-1].split('.')[0]

            #load tri-axial accelerometer data only
            data = pd.read_csv(file, usecols=['accelX', 'accelY', 'accelZ']).values

            #segment continuous signals into overlapping sliding windows
            for i in range(0, len(data)-window_size, step_size):
                X_all.append(data[i:i+window_size])
                y_all.append(label)
                subjects_all.append(subject)
        except:
            #skip files that cannot be read or are incorrectly formatted
            continue

#convert accumulated lists into numpy arrays for efficient processing
X = np.array(X_all)
y = np.array(y_all)
subjects = np.array(subjects_all)

#print dataset statistics for verification and reporting
print(f"Total windows: {len(X)}")
print(f"Total subjects: {len(np.unique(subjects))}")


In [None]:
#identify all unique subjects in the dataset
unique_subjects = np.unique(subjects)

#fix random seed to ensure reproducible subject splits
np.random.seed(42)
np.random.shuffle(unique_subjects)

#define subject-level train, validation, and test split boundaries
train_end = int(len(unique_subjects) * 0.70)
val_end = int(len(unique_subjects) * 0.85)

#split subjects to prevent data leakage across sets
train_subjects = unique_subjects[:train_end]
val_subjects = unique_subjects[train_end:val_end]
test_subjects = unique_subjects[val_end:]

#create boolean masks for subject-wise data selection
train_mask = np.isin(subjects, train_subjects)
val_mask = np.isin(subjects, val_subjects)
test_mask = np.isin(subjects, test_subjects)

#apply subject-wise masks to obtain final datasets
X_train = X[train_mask]
y_train = y[train_mask]
X_val = X[val_mask]
y_val = y[val_mask]
X_test = X[test_mask]
y_test = y[test_mask]

#print split statistics for verification
print(f"Train: {len(train_subjects)} subjects, {len(X_train)} windows")
print(f"Val:   {len(val_subjects)} subjects, {len(X_val)} windows")
print(f"Test:  {len(test_subjects)} subjects, {len(X_test)} windows")

#compute handcrafted feature vectors for each window in every split
X_train_gravity = prepare_data_with_features(X_train)
X_val_gravity = prepare_data_with_features(X_val)
X_test_gravity = prepare_data_with_features(X_test)

#print feature tensor shapes to confirm consistency
print(f"\nFeatures shape - Train: {X_train_gravity.shape}, Val: {X_val_gravity.shape}, Test: {X_test_gravity.shape}")

In [None]:
#normalise accelerometer data using statistics computed from the training set only
def normalise_data(X_train, X_val):
    #store original dimensions for reshaping
    n_train = X_train.shape[0]
    n_val = X_val.shape[0]
    window_size = X_train.shape[1]
    n_features = X_train.shape[2]

    #flatten windowed data to apply standard scaling per feature
    X_train_flat = X_train.reshape(-1, n_features)
    X_val_flat = X_val.reshape(-1, n_features)

    #fit scaler on training data and apply consistently to validation data
    scaler = StandardScaler()
    X_train_flat = scaler.fit_transform(X_train_flat)
    X_val_flat = scaler.transform(X_val_flat)

    #reshape normalised data back to original window structure
    X_train_norm = X_train_flat.reshape(n_train, window_size, n_features)
    X_val_norm = X_val_flat.reshape(n_val, window_size, n_features)

    return X_train_norm, X_val_norm, scaler

#apply normalisation to training and validation sets
X_train_norm, X_val_norm, scaler = normalise_data(X_train, X_val)

#apply the same trained scaler to the test set to avoid data leakage
X_test_norm = scaler.transform(X_test.reshape(-1, 3)).reshape(X_test.shape)

#print shapes to verify correct normalisation and reshaping
print(f"X_train_norm shape: {X_train_norm.shape}")
print(f"X_val_norm shape: {X_val_norm.shape}")
print(f"X_test_norm shape: {X_test_norm.shape}")

In [None]:
#save the fitted scaler to disk for consistent preprocessing during deployment
with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

#print scaler parameters for transparency and reproducibility
print("Scaler Parameters:")
print(f"Mean: {scaler.mean_}")
print(f"Std: {scaler.scale_}")

#confirm successful saving of the scaler object
print("\nScaler saved as 'scaler.pkl'")

In [None]:
#initialise one-hot encoder for multi-class classification
encoder = OneHotEncoder(sparse_output=False)

#convert integer class labels into one-hot encoded vectors
y_train_new = encoder.fit_transform(y_train.reshape(-1, 1))
y_val_new = encoder.transform(y_val.reshape(-1, 1))
y_test_new = encoder.transform(y_test.reshape(-1, 1))

#print encoded label shapes to verify correct conversion
print(f"y_train_new: {y_train_new.shape}")
print(f"y_val_new: {y_val_new.shape}")
print(f"y_test_new: {y_test_new.shape}")

# 4. Build the ML Model

In [None]:
def build_hybrid_model(input_shape_cnn, input_shape_features, num_classes):

    #cnn branch processing raw tri-axial acceleration sequences
    cnn_input = Input(shape=input_shape_cnn, name='cnn_input')

    #block 1: broad temporal kernels for low-level motion patterns
    x = Conv1D(64, 5, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(cnn_input)
    x = BatchNormalization()(x)
    x = MaxPooling1D(2)(x)
    x = Dropout(0.3)(x)

    #block 2: reduced kernel size to combine lower-level features
    x = Conv1D(128, 3, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(2)(x)
    x = Dropout(0.3)(x)

    #block 3: high-level temporal abstraction prior to dense projection
    x = Conv1D(256, 3, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu', kernel_regularizer=l2(0.001))(x)
    cnn_features = Dropout(0.3)(x)

    #feature branch processing engineered window-level features
    feature_input = Input(shape=(input_shape_features,), name='feature_input')
    y = Dense(64, activation='relu')(feature_input)
    y = BatchNormalization()(y)
    y = Dropout(0.3)(y)
    y = Dense(32, activation='relu')(y)
    y = BatchNormalization()(y)
    feature_branch = Dropout(0.3)(y)

    #fusion of learned temporal features and engineered descriptors
    combined = concatenate([cnn_features, feature_branch])

    #fully connected fusion layers with strong regularisation
    z = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(combined)
    z = BatchNormalization()(z)
    z = Dropout(0.6)(z)
    z = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(z)
    z = Dropout(0.6)(z)

    #softmax output for multi-class activity classification
    output = Dense(num_classes, activation='softmax')(z)

    #compile model with categorical cross-entropy for multi-class learning
    model = Model(inputs=[cnn_input, feature_input], outputs=output)
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
total_params = model.count_params()
print(f"Total parameters: {total_params:,}")

In [None]:
#define input shapes for cnn branch and handcrafted feature branch
input_shape_cnn = (X_train_norm.shape[1], X_train_norm.shape[2])
input_shape_features = X_train_gravity.shape[1]

#define number of output classes for multi-class classification
num_classes = y_train_new.shape[1]

#print model configuration details for verification
print(f"CNN Input shape: {input_shape_cnn}")
print(f"Features shape: {input_shape_features}")
print(f"Number of classes: {num_classes}")

#build and initialise the hybrid activity recognition model
model = build_hybrid_model(input_shape_cnn, input_shape_features, num_classes)

#print model architecture summary
model.summary()

# 5. Train the Model

In [None]:
def build_hybrid_model(input_shape_cnn, input_shape_features, num_classes):

    #cnn branch processing raw tri-axial acceleration sequences
    cnn_input = Input(shape=input_shape_cnn, name='cnn_input')

    #block 1: broad temporal kernels for low-level motion patterns
    x = Conv1D(64, 5, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(cnn_input)
    x = BatchNormalization()(x)
    x = MaxPooling1D(2)(x)
    x = Dropout(0.3)(x)

    #block 2: reduced kernel size to combine lower-level features
    x = Conv1D(128, 3, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = MaxPooling1D(2)(x)
    x = Dropout(0.3)(x)

    #block 3: high-level temporal abstraction prior to dense projection
    x = Conv1D(256, 3, activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = Flatten()(x)
    x = Dense(128, activation='relu', kernel_regularizer=l2(0.001))(x)
    cnn_features = Dropout(0.3)(x)

    #feature branch processing engineered window-level features
    feature_input = Input(shape=(input_shape_features,), name='feature_input')
    y = Dense(64, activation='relu')(feature_input)
    y = BatchNormalization()(y)
    y = Dropout(0.3)(y)
    y = Dense(32, activation='relu')(y)
    y = BatchNormalization()(y)
    feature_branch = Dropout(0.3)(y)

    #fusion of learned temporal features and engineered descriptors
    combined = concatenate([cnn_features, feature_branch])

    #fully connected fusion layers with strong regularisation
    z = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(combined)
    z = BatchNormalization()(z)
    z = Dropout(0.6)(z)
    z = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(z)
    z = Dropout(0.6)(z)

    #softmax output for multi-class activity classification
    output = Dense(num_classes, activation='softmax')(z)

    #compile model with categorical cross-entropy for multi-class learning
    model = Model(inputs=[cnn_input, feature_input], outputs=output)
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
#compute balanced class weights to mitigate class imbalance during training
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(y_train),
    y=y_train
)

#map weights to class indices for keras training
class_weight_dict = dict(enumerate(class_weights))

#print weights for transparency and reproducibility
print("Class weights:", class_weight_dict)

In [None]:
#compute variance threshold from training data for static/dynamic separation
variance_threshold = compute_variance_threshold(
    X_train_norm,
    y_train,
    static_activities,
    dynamic_activities
)

#persist threshold for consistent behaviour during deployment
with open('variance_threshold.pkl', 'wb') as f:
    pickle.dump(variance_threshold, f)

#configure training callbacks to control overfitting and convergence
callbacks = [
    EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True,
        verbose=1
    ),
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=7,
        min_lr=1e-6,
        verbose=1
    )
]

#train hybrid model with class weighting and validation split
#train hybrid model with class weighting and held-out validation subjects
history = model.fit(
    [X_train_norm, X_train_gravity],
    y_train_new,
    epochs=80,
    batch_size=64,
    validation_data=([X_val_norm, X_val_gravity], y_val_new),  # ← ADD THIS LINE
    callbacks=callbacks,
    class_weight=class_weight_dict,
    verbose=1
)


# 6. Evaluate the Model

In [None]:
#evaluate trained model on held-out test subjects using variance-based post-processing
print("Computing variance threshold from training data...")
print(f"Variance threshold: {variance_threshold:.6f}\n")

#generate class probability predictions for test windows
print("Generating predictions on test set...")
y_pred_probs = model.predict([X_test_norm, X_test_gravity])

#apply physics-informed variance filtering to refine static/dynamic predictions
print("Applying variance-based filtering...")
y_pred = apply_variance_filtering(
    y_pred_probs,
    X_test_norm,
    variance_threshold,
    static_activities,
    dynamic_activities,
    confidence_threshold=0.95
)

#recover true class labels from one-hot encoding
y_true = np.argmax(y_test_new, axis=1)

#compute overall subject-independent test accuracy
overall_accuracy = accuracy_score(y_true, y_pred)
print(f"\n{'='*60}")
print(f"TEST SET RESULTS (with variance filtering)")
print(f"{'='*60}")
print(f"Overall Accuracy: {overall_accuracy:.4f} ({overall_accuracy*100:.2f}%)")

#separate static and dynamic activity subsets for targeted evaluation
static_mask = np.isin(y_true, static_activities)
dynamic_mask = np.isin(y_true, dynamic_activities)

static_accuracy = accuracy_score(y_true[static_mask], y_pred[static_mask])
dynamic_accuracy = accuracy_score(y_true[dynamic_mask], y_pred[dynamic_mask])

#report performance against predefined static activity target
print(f"\nStatic Activities Accuracy: {static_accuracy:.4f} ({static_accuracy*100:.2f}%)")
print(f"  Target: 90% - {'good' if static_accuracy >= 0.90 else 'not good'}")

#report performance against predefined dynamic activity target
print(f"\nDynamic Activities Accuracy: {dynamic_accuracy:.4f} ({dynamic_accuracy*100:.2f}%)")
print(f"  Target: 85% - {'good' if dynamic_accuracy >= 0.85 else 'not good'}")


In [None]:
#plot training and validation performance to analyse learning behaviour
plt.figure(figsize=(14, 5))

#plot accuracy curves to assess convergence and generalisation
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy', linewidth=2)
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', linewidth=2)
plt.title('Model Accuracy Over Epochs', fontsize=14, fontweight='bold')
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Accuracy', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)

#plot loss curves to inspect optimisation stability and overfitting
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss', linewidth=2)
plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
plt.title('Model Loss Over Epochs', fontsize=14, fontweight='bold')
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)

#save figure for inclusion in the final report
plt.tight_layout()
plt.savefig('figure1_training_curves.png', dpi=300, bbox_inches='tight')
plt.show()

NameError: name 'plt' is not defined

In [None]:
#define readable activity class names for reporting and visualisation
activity_names = list(activity_mapping.keys())

#print detailed precision, recall, and f1-score for each activity class
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=activity_names))

#plot confusion matrix showing absolute prediction counts
plt.figure(figsize=(12, 10))
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=activity_names,
            yticklabels=activity_names,
            cbar_kws={'label': 'Count'})
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix for Activity Classification')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('figure3_confusion_matrix_counts.png', dpi=300, bbox_inches='tight')
plt.show()

#compute row-normalised confusion matrix to analyse per-class accuracy
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

#plot normalised confusion matrix for relative performance comparison
plt.figure(figsize=(12, 10))
sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='RdYlGn',
            xticklabels=activity_names,
            yticklabels=activity_names,
            cbar_kws={'label': 'Percentage'},
            vmin=0, vmax=1)
plt.xlabel('Predicted', fontsize=12)
plt.ylabel('True', fontsize=12)
plt.title('Normalised Confusion Matrix', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('figure4_confusion_matrix_normalized.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
#compute per-class precision, recall, f1-score, and support
precision, recall, f1, support = precision_recall_fscore_support(y_true, y_pred)

#create a bar chart comparing precision, recall, and f1-score for each activity class
fig, ax = plt.subplots(figsize=(14, 6))
x = np.arange(len(activity_names))
width = 0.25

#plot bars for precision, recall, and f1-score
bars1 = ax.bar(x - width, precision, width, label='Precision', color='salmon', alpha=0.8)
bars2 = ax.bar(x, recall, width, label='Recall', color='lightblue', alpha=0.8)
bars3 = ax.bar(x + width, f1, width, label='F1-Score', color='lightgreen', alpha=0.8)

#draw target lines for static and dynamic activity accuracy thresholds
ax.axhline(y=0.90, color='black', linestyle='--', linewidth=2, label='Static Target (90%)')
ax.axhline(y=0.85, color='gray', linestyle='-.', linewidth=2, label='Dynamic Target (85%)')

#set axis labels and title
ax.set_xlabel('Activity Class', fontsize=12)
ax.set_ylabel('Score', fontsize=12)
ax.set_title('Per-Class Performance Metrics', fontsize=14, fontweight='bold')

#set tick labels to activity names
ax.set_xticks(x)
ax.set_xticklabels(activity_names, rotation=45, ha='right')

#add legend, set y-axis limits, and add grid lines for readability
ax.legend(fontsize=11)
ax.set_ylim([0, 1.05])
ax.grid(True, alpha=0.3, axis='y')

#adjust layout to prevent clipping and save figure for report
plt.tight_layout()
plt.savefig('figure2_per_class_performance.png', dpi=300, bbox_inches='tight')
plt.show()


# 7. Visualise Performance with Confusion Matrix

In [None]:
#perform leave-n-subjects-out cross-validation to evaluate subject generalisation
def leave_n_subjects_out_cv(X, y, subjects, num_folds=5):
    from collections import defaultdict

    #identify all unique subjects and shuffle for random fold assignment
    unique_subjects = np.unique(subjects)
    np.random.seed(42)
    np.random.shuffle(unique_subjects)

    #determine number of subjects per fold
    fold_size = len(unique_subjects) // num_folds

    #store accuracy metrics for each fold
    fold_accuracies = []
    fold_static_accs = []
    fold_dynamic_accs = []
    per_class_accuracies = defaultdict(list)  # ← NEW: Track per-class

    #iterate over each cross-validation fold
    for fold in range(num_folds):
        print(f"\nFold {fold + 1}/{num_folds}")

        #select test and training subjects for the current fold
        start_idx = fold * fold_size
        end_idx = start_idx + fold_size if fold < num_folds - 1 else len(unique_subjects)
        test_subs = unique_subjects[start_idx:end_idx]
        train_val_subs = np.setdiff1d(unique_subjects, test_subs)

        #split train_val_subs into train and validation subjects
        np.random.shuffle(train_val_subs)
        val_size = int(len(train_val_subs) * 0.15 / 0.85)
        val_subs_fold = train_val_subs[:val_size]
        train_subs_fold = train_val_subs[val_size:]

        print(f"  Train subjects: {len(train_subs_fold)}, Val subjects: {len(val_subs_fold)}, Test subjects: {len(test_subs)}")

        #create subject-wise masks
        train_mask = np.isin(subjects, train_subs_fold)
        val_mask = np.isin(subjects, val_subs_fold)
        test_mask = np.isin(subjects, test_subs)

        #split data
        X_train_fold = X[train_mask]
        y_train_fold = y[train_mask]
        X_val_fold = X[val_mask]
        y_val_fold = y[val_mask]
        X_test_fold = X[test_mask]
        y_test_fold = y[test_mask]

        #normalise data
        X_train_norm_fold, X_val_norm_fold, scaler_fold = normalise_data(X_train_fold, X_val_fold)
        X_test_norm_fold = scaler_fold.transform(X_test_fold.reshape(-1, 3)).reshape(X_test_fold.shape)

        #extract features
        X_train_gravity_fold = prepare_data_with_features(X_train_fold)
        X_val_gravity_fold = prepare_data_with_features(X_val_fold)
        X_test_gravity_fold = prepare_data_with_features(X_test_fold)

        #one-hot encode
        encoder_fold = OneHotEncoder(sparse_output=False)
        y_train_oh = encoder_fold.fit_transform(y_train_fold.reshape(-1, 1))
        y_val_oh = encoder_fold.transform(y_val_fold.reshape(-1, 1))
        y_test_oh = encoder_fold.transform(y_test_fold.reshape(-1, 1))

        #compute class weights
        class_weights_fold = compute_class_weight('balanced', classes=np.unique(y_train_fold), y=y_train_fold)
        class_weight_dict_fold = dict(enumerate(class_weights_fold))

        #build model
        input_shape_cnn = (X_train_norm_fold.shape[1], X_train_norm_fold.shape[2])
        input_shape_features = X_train_gravity_fold.shape[1]
        num_classes = y_train_oh.shape[1]
        model_cv = build_hybrid_model(input_shape_cnn, input_shape_features, num_classes)

        #define callbacks
        callbacks_cv = [
            EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True, verbose=0),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=0)
        ]

        #train model
        model_cv.fit(
            [X_train_norm_fold, X_train_gravity_fold],
            y_train_oh,
            epochs=80,
            batch_size=64,
            validation_data=([X_val_norm_fold, X_val_gravity_fold], y_val_oh),
            callbacks=callbacks_cv,
            class_weight=class_weight_dict_fold,
            verbose=0
        )

        #compute variance threshold
        variance_threshold_fold = compute_variance_threshold(
            X_train_norm_fold, y_train_fold, static_activities, dynamic_activities
        )

        #predict
        y_pred_probs_fold = model_cv.predict([X_test_norm_fold, X_test_gravity_fold], verbose=0)
        y_pred_fold = apply_variance_filtering(
            y_pred_probs_fold, X_test_norm_fold, variance_threshold_fold,
            static_activities, dynamic_activities, confidence_threshold=0.95
        )

        #get true labels
        y_true_fold = np.argmax(y_test_oh, axis=1)

        #compute overall accuracy
        acc = accuracy_score(y_true_fold, y_pred_fold)

        #compute static and dynamic accuracies
        static_mask_fold = np.isin(y_true_fold, static_activities)
        dynamic_mask_fold = np.isin(y_true_fold, dynamic_activities)
        static_acc = accuracy_score(y_true_fold[static_mask_fold], y_pred_fold[static_mask_fold]) if static_mask_fold.sum() > 0 else 0
        dynamic_acc = accuracy_score(y_true_fold[dynamic_mask_fold], y_pred_fold[dynamic_mask_fold]) if dynamic_mask_fold.sum() > 0 else 0


        for class_idx in range(num_classes):
            class_mask = (y_true_fold == class_idx)
            if class_mask.sum() > 0:
                class_acc = accuracy_score(y_true_fold[class_mask], y_pred_fold[class_mask])
                per_class_accuracies[class_idx].append(class_acc)


        #store fold results
        fold_accuracies.append(acc)
        fold_static_accs.append(static_acc)
        fold_dynamic_accs.append(dynamic_acc)

        print(f"  Overall: {acc:.4f}, Static: {static_acc:.4f}, Dynamic: {dynamic_acc:.4f}")

    #report summary statistics

    print("CV summary:")
    print(f"Overall Accuracy:  {np.mean(fold_accuracies):.4f} ± {np.std(fold_accuracies):.4f}")
    print(f"Static Accuracy:   {np.mean(fold_static_accs):.4f} ± {np.std(fold_static_accs):.4f}")
    print(f"Dynamic Accuracy:  {np.mean(fold_dynamic_accs):.4f} ± {np.std(fold_dynamic_accs):.4f}")


    print("Table 1 : Per-Class Cross-Validation Accuracy")

    print(f"{'Activity':<20} {'CV Mean ± Std':<20} {'Category':<10}")


    for class_idx, class_name in enumerate(activity_names):
        if class_idx in per_class_accuracies and len(per_class_accuracies[class_idx]) > 0:
            accs = per_class_accuracies[class_idx]
            mean_acc = np.mean(accs) * 100
            std_acc = np.std(accs) * 100
            category = "Static" if class_idx in static_activities else "Dynamic"
            print(f"{class_name:<20} {mean_acc:5.2f}% ± {std_acc:5.2f}%   {category:<10}")
        else:
            category = "Static" if class_idx in static_activities else "Dynamic"
            print(f"{class_name:<20} {'No test samples':<20}   {category:<10}")

    return fold_accuracies, fold_static_accs, fold_dynamic_accs

#run cross-validation
cv_results = leave_n_subjects_out_cv(X, y, subjects, num_folds=5)

In [None]:
#convert trained Keras model to TensorFlow Lite format for edge deployment
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

#save the converted TFLite model to disk
with open('har_model.tflite', 'wb') as f:
    f.write(tflite_model)

#report final model size to assess suitability for embedded devices
print(f"Model size: {os.path.getsize('har_model.tflite') / 1024:.2f} KB")

#create output directory for deployment artefacts
output_dir = "output/"
os.makedirs(output_dir, exist_ok=True)

#copy TFLite model and normalisation scaler for inference-time preprocessing
shutil.copy('har_model.tflite', os.path.join(output_dir, 'har_model.tflite'))
shutil.copy('scaler.pkl', os.path.join(output_dir, 'scaler.pkl'))

print(f"Files saved to {output_dir}")

In [None]:
#collect and store all generated figures for reproducible reporting
for png_file in glob.glob("*.png"):
    shutil.copy(png_file, os.path.join(output_dir, png_file))

#confirm successful export of visual results
print("Figures saved to output/")

# Testing on New Dataset


In [None]:
#load previously saved normalisation scaler to ensure consistent preprocessing
with open('scaler.pkl', 'rb') as f:
    scaler = pickle.load(f)

#load tflite model for lightweight inference on unseen data
interpreter = tf.lite.Interpreter(model_path='har_model.tflite')
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

#extract external dataset used for generalisation testing
external_data_zip = "data/RESpeckData_2526_fixed.zip"
with zipfile.ZipFile(external_data_zip, 'r') as zip_ref:
    zip_ref.extractall("data/")

#path to external dataset containing unseen subjects
new_dataset_path = "data/RESpeckData_2526_fixed/daily_activity/"

#redefine activity mapping to ensure label consistency with training
new_dataset_path = "/content/RESpeckData_2526_fixed/daily_activity/"
activity_mapping = {'sittingStanding': 0, 'lyingLeft': 1, 'lyingRight': 2,
                    'lyingBack': 3, 'lyingStomach': 4, 'normalWalking': 5,
                    'running': 6, 'ascending': 7, 'descending': 8,
                    'shuffleWalking': 9, 'miscMovement': 10}

#group activities into static and dynamic for targeted performance analysis
static_activities = [0, 1, 2, 3, 4]
dynamic_activities = [5, 6, 7, 8, 9, 10]

#load and segment external dataset using identical windowing strategy
X_test_ext, y_test_ext = [], []
for activity, label in activity_mapping.items():
    for file in glob.glob(os.path.join(new_dataset_path, activity, "*.csv")):
        try:
            df = pd.read_csv(file)

            #select accelerometer axes irrespective of column naming
            accel_cols = [col for col in df.columns if 'accel' in col.lower()]
            if len(accel_cols) >= 3:
                data = df[accel_cols[:3]].values

            #apply sliding window segmentation consistent with training
            for i in range(0, len(data)-50, 25):
                X_test_ext.append(data[i:i+50])
                y_test_ext.append(label)

        except Exception as e:
            print(f"Error reading {file}: {e}")
            continue

#convert external test data to numpy arrays
X_test_ext = np.array(X_test_ext)
y_test_ext = np.array(y_test_ext)

print(f"Loaded {len(X_test_ext)} windows")

#extract identical enhanced feature set to avoid train test mismatch
X_gravity_ext = prepare_data_with_features(X_test_ext)

#apply training scaler to external data for fair evaluation
X_norm_ext = scaler.transform(X_test_ext.reshape(-1, 3)).reshape(X_test_ext.shape)

#perform inference using tflite interpreter window by window
predictions = []
for i in range(len(X_norm_ext)):
    interpreter.set_tensor(input_details[0]['index'], X_norm_ext[i:i+1].astype(np.float32))
    interpreter.set_tensor(input_details[1]['index'], X_gravity_ext[i:i+1].astype(np.float32))
    interpreter.invoke()
    predictions.append(interpreter.get_tensor(output_details[0]['index'])[0])

#convert predicted probabilities to class labels
y_pred_ext = np.argmax(predictions, axis=1)

#report external dataset performance to assess model generalisation
print(f"\nExternal Dataset Results:")
print(f"Overall: {accuracy_score(y_test_ext, y_pred_ext):.4f}")
print(f"Static:  {accuracy_score(y_test_ext[np.isin(y_test_ext, static_activities)], y_pred_ext[np.isin(y_test_ext, static_activities)]):.4f}")
print(f"Dynamic: {accuracy_score(y_test_ext[np.isin(y_test_ext, dynamic_activities)], y_pred_ext[np.isin(y_test_ext, dynamic_activities)]):.4f}")

#print per class precision recall and f1 score on unseen data
print("\n", classification_report(y_test_ext, y_pred_ext, target_names=list(activity_mapping.keys())))

#define class names for external confusion matrix visualisation
activity_names_ext = list(activity_mapping.keys())

#plot confusion matrix showing absolute error distribution on external data
plt.figure(figsize=(12, 10))
cm_ext = confusion_matrix(y_test_ext, y_pred_ext)
sns.heatmap(cm_ext, annot=True, fmt='d', cmap='Oranges',
            xticklabels=activity_names_ext,
            yticklabels=activity_names_ext,
            cbar_kws={'label': 'Count'})
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix - External Dataset (RESpeckData 2526)')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('figure6_external_test_cm_counts.png', dpi=300, bbox_inches='tight')
plt.show()

#compute normalised confusion matrix to evaluate relative class performance
cm_ext_normalized = cm_ext.astype('float') / cm_ext.sum(axis=1)[:, np.newaxis]

#plot normalised confusion matrix for generalisation analysis
plt.figure(figsize=(12, 10))
sns.heatmap(cm_ext_normalized, annot=True, fmt='.2%', cmap='RdYlGn',
            xticklabels=activity_names_ext,
            yticklabels=activity_names_ext,
            cbar_kws={'label': 'Percentage'},
            vmin=0, vmax=1)
plt.xlabel('Predicted', fontsize=12)
plt.ylabel('True', fontsize=12)
plt.title('Normalised Confusion Matrix - External Dataset (RESpeckData 2526)', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('figure7_testing_normalised_matrix.png', dpi=300, bbox_inches='tight')
plt.show()