# Human Activity Recognition (HAR) with MotionSense Dataset (CNN Approach.v1)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
# torch.cuda.is_available():
#   Returns True if a CUDA-compatible GPU is available
# torch.device(...):
#   Sets the computation device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Prints which device is being used
print(f"Using device: {device}")

In [None]:
from google.colab import drive
# Mounts the Google Drive to the specified directory
drive.mount('/content/drive')
# Defines a string variable DATA_PATH_PREFIX that stores the path to a specific folder in the Drive
# Base path to access data files
DATA_PATH_PREFIX = "/content/drive/MyDrive/EEE4114F - ML/"

In [None]:
def get_ds_infos():
    """
    Read the file includes data subject information.
    Returns:
        A pandas DataFrame that contains information about data subjects' attributes
    """
    # try block to handle errors
    try:
        # Attempts to load a CSV file named data_subjects_info.csv from the given DATA_PATH_PREFIX path
        # dss holds the loaded DataFrame
        dss = pd.read_csv(DATA_PATH_PREFIX + "data_subjects_info.csv")
        # Prints a success message if the file loads without issues
        print("[INFO] -- Data subjects' information is imported.")
    # Catches the error if the file is not found at the specific path
    except FileNotFoundError:
        # Prints an error message
        print(f"[ERROR] -- data_subjects_info.csv not found at {DATA_PATH_PREFIX + 'data_subjects_info.csv'}")
        print("Please ensure DATA_PATH_PREFIX is set correctly and the file exists.")
        # If the file is not found, it returns an empty DataFrame with a single column named 'code'
        return pd.DataFrame(columns=['code'])
    # if the file was successfully read, the loaded DataFrame (stored in dss) is returned
    return dss

In [None]:
def set_data_types(data_types=["userAcceleration"]):
    """
    Select the sensors and return a list of lists of column names.
    Args:
        data_types: A list of sensor data type from this list: [attitude, gravity, rotationRate, userAcceleration]
    Returns:
        A list of lists, where each inner list contains the column names for a sensor type (e.g., [t+".x",t+".y",t+".z"]).
    """
    # Iintializes an empty list to store the results
    dt_list_groups = []
    # Loop through each sensor type specified in the data_types list
    for t in data_types:
        # Check if the sensor type is not attitude
        # Yes: Construst a list of the 3-axis column names and append it to the result list
        if t != "attitude":
            dt_list_groups.append([t+".x", t+".y", t+".z"])
        else:
            # Attitude uses orientation angle
            dt_list_groups.append([t+".roll", t+".pitch", t+".yaw"])
    # Returns the list of column names
    return dt_list_groups

In [None]:
def create_time_series(dt_list_groups_config, act_labels_config, trial_codes_config_map, mode="raw", labeled=True):
    """
    Creates a time-series DataFrame from raw sensor data files.
    Args:
        dt_list_groups_config: List of lists of feature column names, grouped by sensor type.
        act_labels_config: List of activity names.
        trial_codes_config_map: Dictionary mapping activity names to lists of trial codes.
        mode: "raw" (magnitude mode is not fully supported by this simplified version).
        labeled: Boolean, True if activity labels should be included.
    Returns:
        A tuple: (full_dataset_df, feature_cols_flat_list)
            full_dataset_df: Pandas DataFrame with all time-series data and labels.
            feature_cols_flat_list: Flat list of all feature column names.
    """
    # Flattens the list of lists into a single list of feature column names
    feature_cols_flat_list = [col for group in dt_list_groups_config for col in group]
    num_feature_cols = len(feature_cols_flat_list)

    # Initializes the column names for the final DataFrame
    column_names_for_df = feature_cols_flat_list[:]
    # Adds "act" column if activity labes are included
    if labeled:
        column_names_for_df.append("act")

    # Loads subject metada
    all_trial_dfs = []
    ds_list = get_ds_infos()
    # Check if subject info is missing
    # Yes: Prints an error message, aborts and returns an empty DataFrame
    if ds_list.empty and 'code' not in ds_list.columns:
        print("[ERROR] -- Cannot proceed without subject information.")
        return pd.DataFrame(columns=column_names_for_df), feature_cols_flat_list


    print("[INFO] -- Creating Time-Series")
    # Loops hrough all subjects and trials
    for sub_id in ds_list["code"]:
        for act_id, act_name in enumerate(act_labels_config):
            # Checks if no trials are configure
            # Yes:  Skips the activity
            if act_name not in trial_codes_config_map:
                print(f"[WARNING] -- No trial codes found for activity: {act_name}. Skipping.")
                continue
            for trial_code in trial_codes_config_map[act_name]:
                # Constructs the full file path for each subject's trial
                fname = f'{DATA_PATH_PREFIX}A_DeviceMotion_data/A_DeviceMotion_data/{act_name}_{trial_code}/sub_{int(sub_id)}.csv'
                # try block to handle errors
                try:
                    # Attampts to read the CSV file
                    raw_data_per_trial = pd.read_csv(fname)
                # Catches the error if the file is not found
                except FileNotFoundError:
                    # Prints a warninng message
                    print(f"[WARNING] -- File not found: {fname}. Skipping.")
                    continue

                # Drops unwanted index columns automatically added by pandas during saving
                raw_data_per_trial = raw_data_per_trial.drop(['Unnamed: 0'], axis=1, errors='ignore')

                # Extracts only the relevant feature columns for this trial
                current_trial_features = raw_data_per_trial[feature_cols_flat_list].values

               # Checks if labeled
               # Creates an array of labels with the same numbeer as the featires
                if labeled:
                    labels_for_this_trial = np.full((len(raw_data_per_trial), 1), act_id)
                    trial_data_np = np.concatenate((current_trial_features, labels_for_this_trial), axis=1)
                else:
                    trial_data_np = current_trial_features
                # Converts the current trial into a DataFrame and apends to the list of all trials
                all_trial_dfs.append(pd.DataFrame(data=trial_data_np, columns=column_names_for_df))

    # Checks if no trails loaded successfully
    # Yes: Returns an empty DataFrame
    if not all_trial_dfs:
        print("[ERROR] -- No data successfully loaded. Please check file paths, data structure, and configurations.")
        return pd.DataFrame(columns=column_names_for_df), feature_cols_flat_list

    print(f"[INFO] -- Concatenating {len(all_trial_dfs)} individual trial DataFrames.")
    # Concatenates all DataFrames into one final DataFrame
    full_dataset_df = pd.concat(all_trial_dfs, ignore_index=True)
    # Returns the complete time-series dataset (full_dataset_df) and a list of column names for model input (feature_cols_flat_list)
    return full_dataset_df, feature_cols_flat_list

In [None]:
def create_windows_from_df(data_df, feature_cols, label_col, window_size, stride):
    """
    Creates windowed data from a DataFrame (time-domain features).
    Args:
        data_df: Input DataFrame with features and labels.
        feature_cols: List of feature column names.
        label_col: Name of the label column.
        window_size: Size of each window.
        stride: Stride between windows.
    Returns:
        A tuple (np.array(X_windows), np.array(Y_labels)).
        X_windows shape: (num_windows, num_features, window_size)
        Y_labels shape: (num_windows,)
    """
    X_windows_list = []
    Y_labels_list = []

    feature_data_np = data_df[feature_cols].values
    label_data_np = data_df[label_col].values
    print(f"[DEBUG] -- Creating windows. Input feature_data_np shape: {feature_data_np.shape}")

    for i in range(0, len(feature_data_np) - window_size + 1, stride):
        window_features = feature_data_np[i : i + window_size]
        window_labels_raw = label_data_np[i : i + window_size]

        X_windows_list.append(window_features.T)

        label_counts = np.bincount(window_labels_raw.astype(int))
        mode_label = np.argmax(label_counts)
        Y_labels_list.append(mode_label)

        if (len(X_windows_list) % 2000) == 0:
            print(f"[INFO] -- Processed {len(X_windows_list)} windows...")

    return np.array(X_windows_list), np.array(Y_labels_list)

In [None]:
# Defines a list of activity labels
ACT_LABELS = ["dws","ups", "wlk", "jog", "std", "sit"]
# Creates a mapping between each activity and its corresponding trial codes
TRIAL_CODES = {
    ACT_LABELS[0]:[1,2,11], #dws: trials 1, 2, 11
    ACT_LABELS[1]:[3,4,12], #ups: trials 3, 4, 12
    ACT_LABELS[2]:[7,8,15], #wlk: trials 7, 8, 15
    ACT_LABELS[3]:[9,16],   #jog: trials 9, 16
    ACT_LABELS[4]:[6,14],   #std: trials 6, 14
    ACT_LABELS[5]:[5,13]    #sit: trials 5, 13
}

In [None]:
# Specifies which sensor data types to include from each CSV file
# Available options: "attitude", "gravity", "rotationRate", "userAcceleration"
SELECTED_SENSOR_DATA_TYPES = ["userAcceleration", "rotationRate"]
# Prints the current configuration
print(f"[INFO] -- Selected sensor data types: {SELECTED_SENSOR_DATA_TYPES}")
print(f"[INFO] -- Selected activities: {ACT_LABELS}")

In [None]:
# Sets sensor data types and groups them accordingly
dt_list_feature_groups = set_data_types(SELECTED_SENSOR_DATA_TYPES)

# Creates the time-series dataset from raw sensor data using the specified activity labels and trial codes
dataset, feature_columns_list = create_time_series(dt_list_feature_groups, ACT_LABELS, TRIAL_CODES, mode="raw", labeled=True)

# Checks if the resulting dataset is empty
# Yes: Halts the execution
if dataset.empty:
    print("[STOP] -- Dataset is empty. Halting execution.")
    exit()
# No: Displays the dataset shape and preview
else:
    print(f"[INFO] -- Shape of raw time-Series dataset: {dataset.shape}")
    print(dataset.head())
    # Determines the number of features
    NUM_FEATURES = len(feature_columns_list)
    # Determines the number of activity classes
    NUM_CLASSES = len(ACT_LABELS)
    # Prints the number of features and class count
    print(f"[INFO] -- Number of features: {NUM_FEATURES}")
    print(f"[INFO] -- Number of classes: {NUM_CLASSES}")

In [None]:
# Defines the size of each window (number of time steps)
WINDOW_SIZE = 200
# Defines the stride (step size between windows)
STRIDE = 100

# Initialize a standard scaler for normalizing feature values (zero mean, unit variance)
scaler = StandardScaler()
# Creates a copy of the dataset to apply scaling
scaled_dataset = dataset.copy()
# Applies scaling to the selected sensor feature columns
scaled_dataset[feature_columns_list] = scaler.fit_transform(dataset[feature_columns_list])
# Prints a confirmation message
print("[INFO] -- Features scaled using StandardScaler.")

In [None]:
# Starts the windowing processto cnvert time-series data into overlapping segments
print("[INFO] -- Starting windowing...")
X_windowed, Y_windowed = create_windows_from_df(
    scaled_dataset, feature_columns_list, 'act',
    WINDOW_SIZE, STRIDE
)

# Prints shapes of the resulting windowed feature and label arrays
print(f"[INFO] -- Shape of windowed X: {X_windowed.shape}")
print(f"[INFO] -- Shape of windowed Y: {Y_windowed.shape}")

In [None]:
# splits the dataset into 80% for training and 20% for testing
X_train, X_test, Y_train, Y_test = train_test_split(
    X_windowed, Y_windowed, test_size=0.2, random_state=42, stratify=Y_windowed
)
# Prints the shapes of all splits
print(f"X_train shape: {X_train.shape}, Y_train shape: {Y_train.shape}")
print(f"X_test shape: {X_test.shape}, Y_test shape: {Y_test.shape}")

In [None]:
class MotionSensePyTorchDataset(Dataset):
    def __init__(self, X_data, Y_data):
        # Convers input features and labels to PyTorch tensors
        self.X = torch.tensor(X_data, dtype=torch.float32)
        self.Y = torch.tensor(Y_data, dtype=torch.long)

    def __getitem__(self, index):
        # Returns a single sample (X, Y) at the given index
        return self.X[index], self.Y[index]

    def __len__(self):
        # Return the total number of samples
        return len(self.X)

# Create dataset objects for training, and testing
train_torch_dataset = MotionSensePyTorchDataset(X_train, Y_train)
test_torch_dataset = MotionSensePyTorchDataset(X_test, Y_test)

# Set the batch size for model training and evaluation
BATCH_SIZE = 256

# Creates DataLoaders for each dataset
train_loader = DataLoader(train_torch_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_torch_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Prints successful DataLoader creation
print("[INFO] -- PyTorch DataLoaders created.")

In [None]:
class CNN_HAR_Model(nn.Module):
    def __init__(self, num_input_features, num_activity_classes):
        super(CNN_HAR_Model, self).__init__()
        # First convolutional layer
        self.conv1 = nn.Conv1d(in_channels=num_input_features, out_channels=100, kernel_size=10, padding='same')
        self.relu1 = nn.ReLU()

        # Second convolutional layer
        self.conv2 = nn.Conv1d(in_channels=100, out_channels=100, kernel_size=10, padding='same')
        self.relu2 = nn.ReLU()

        # Max pooling reduces temporal resolution by selecting the maximum in windows of size 3
        self.pool1 = nn.MaxPool1d(kernel_size=3)

        # Third convolutional layer
        self.conv3 = nn.Conv1d(in_channels=100, out_channels=160, kernel_size=10, padding='same')
        self.relu3 = nn.ReLU()

        # Fourth convolutional layer
        self.conv4 = nn.Conv1d(in_channels=160, out_channels=160, kernel_size=10, padding='same')
        self.relu4 = nn.ReLU()

        # Global average pooling reduces each feature map to a single value
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)

        # Flattens the output for the fully connected layer
        self.flatten = nn.Flatten()

        # Dropout for regularization to prevent overfitting
        self.dropout = nn.Dropout(0.5)

        # Final fully connected layer that outputs logits for classification
        self.fc_out = nn.Linear(160, num_activity_classes)

    def forward(self, x_input):
        # Forward pass through the network
        x = self.relu1(self.conv1(x_input))
        x = self.relu2(self.conv2(x))
        x = self.pool1(x)
        x = self.relu3(self.conv3(x))
        x = self.relu4(self.conv4(x))
        x = self.global_avg_pool(x)
        x = self.flatten(x)
        x = self.dropout(x)
        output_logits = self.fc_out(x)
        return output_logits

    def forward(self, x_input):
        # Forward pass through the network
        x = self.relu1(self.conv1(x_input))
        x = self.relu2(self.conv2(x))
        x = self.pool1(x)
        x = self.relu3(self.conv3(x))
        x = self.relu4(self.conv4(x))
        x = self.global_avg_pool(x)
        x = self.flatten(x)
        x = self.dropout(x)
        output_logits = self.fc_out(x)
        return output_logits

In [None]:
    # Instantiate the CNN model with the number of input features and number of activity classes
    # Move the model to the specified device
    model = CNN_HAR_Model(num_input_features=NUM_FEATURES, num_activity_classes=NUM_CLASSES).to(device)
    # Prints confimation output
    print("[INFO] -- CNN Model instantiated:")
    # Prints number of features (channels for 1D CNN)
    print(f"[INFO] -- Input features (channels for 1D CNN): {NUM_FEATURES}")
    # Prints the structure and parameter count of the model
    print(model)

    # Defines the loss function â†’ CrossEntropyLoss
    loss_criterion = nn.CrossEntropyLoss()
    # Defines the optimizer (Adam) with a learning rate of 0.001
    optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Defines the number of epochs
NUM_EPOCHS = 50
print(f"[INFO] -- Starting training for {NUM_EPOCHS} epochs...")

# Lists to store training/test loss and accuracy for each epoch
train_losses_history = []
test_accuracies_history = []

 # Training loop over the number of epochs
for epoch in range(NUM_EPOCHS):
    # Set model to training mode
    model.train()
    # Accumulator for training loss in this epoch
    total_train_loss = 0

    # Iterates over each batch in the training DataLoader
    for batch_idx, (data_batch, labels_batch) in enumerate(train_loader):
        # Moves data and labels to the same device as the model
        data_batch, labels_batch = data_batch.to(device), labels_batch.to(device)
        # Resets gradients
        optimizer.zero_grad()
        # Forward pass
        outputs_logits = model(data_batch)
        # Computes loss
        loss = loss_criterion(outputs_logits, labels_batch)  # Compute loss
        # Backpropagation
        loss.backward()
        # Updates model parameters
        optimizer.step()
        # Accumulates batch loss
        total_train_loss += loss.item()

        # Prints training progress every 50 batches
        if (batch_idx + 1) % 50 == 0:
            print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Batch [{batch_idx+1}/{len(train_loader)}], Batch Loss: {loss.item():.4f}")

    # Computes average training loss for the epoch and stores it
    avg_epoch_train_loss = total_train_loss / len(train_loader)
    train_losses_history.append(avg_epoch_train_loss)  # Save to history

    # Evaluation on test set
    # Sets model to evaluation mode
    model.eval()
    # Counter for correctly predicted samples
    total_test_correct = 0
    # Counter for correctly predicted samples
    total_test_samples = 0

    # Disables gradient computation for evaluation
    with torch.no_grad():
        for data_batch, labels_batch in test_loader:
            # Moves test data to appropriate device
            data_batch, labels_batch = data_batch.to(device), labels_batch.to(device)
            # Forward pass
            outputs_logits = model(data_batch)
            # Gets predicted class indices
            _, predicted_labels = torch.max(outputs_logits.data, 1)

            # Counts total and correct predictions
            total_test_samples += labels_batch.size(0)
            total_test_correct += (predicted_labels == labels_batch).sum().item()

    # Calculates test accuracy for this epoch
    epoch_test_accuracy = 100 * total_test_correct / total_test_samples
    test_accuracies_history.append(epoch_test_accuracy)

    # Prints summary for the epoch
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Train Loss: {avg_epoch_train_loss:.4f}, Test Acc: {epoch_test_accuracy:.2f}%")

# Training complete
print("[INFO] -- Training Finished.")

In [None]:
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(range(1, NUM_EPOCHS + 1), train_losses_history, label='Training Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(range(1, NUM_EPOCHS + 1), test_accuracies_history, label='Test Accuracy', color='orange', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Test Accuracy Over Epochs')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Sets model to evaluation mode
model.eval()
# Lists to hold predicted and true labels across all test batches
all_predicted_outputs_test = []
all_true_labels_test = []
# Disables gradient calculation for inference
with torch.no_grad():
    for data_batch, labels_batch in test_loader:
        # Move data to the same device as the model
        data_batch = data_batch.to(device)
        # Forward pass
        outputs_logits = model(data_batch)
        # Gets the index (class) with the highest score
        _, predicted_batch_labels = torch.max(outputs_logits.data, 1)
        # Stores predictions and corresponding true labels
        all_predicted_outputs_test.extend(predicted_batch_labels.cpu().numpy())
        all_true_labels_test.extend(labels_batch.cpu().numpy())

# Calculates overall test accuracy
final_test_accuracy = accuracy_score(all_true_labels_test, all_predicted_outputs_test)
# Computes the confusion matrix
test_confusion_mat = confusion_matrix(all_true_labels_test, all_predicted_outputs_test)
# Generates a detailed classification report including precision, recall, F1-score
test_classification_rep = classification_report(
    all_true_labels_test, all_predicted_outputs_test, target_names=ACT_LABELS, zero_division=0
)

# Prints evaluation results
print(f"\n[INFO] -- CNN Model Evaluation on FINAL TEST Set:")
print(f"Overall Test Accuracy: {final_test_accuracy*100:.2f}%")
print("\nTest Set Classification Report:")
print(test_classification_rep)
print("\nTest Set Confusion Matrix:")

  # Plots the confusion matrix as a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(test_confusion_mat, annot=True, fmt='d', cmap='BuPu',
            xticklabels=ACT_LABELS, yticklabels=ACT_LABELS)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('CNN Confusion Matrix - Final Test Set')
plt.show()