# GRU example: Human Activity Recognition Using Smartphones

We are going to play with the UCI's [Human Activity Recognition Using Smartphones](https://archive.ics.uci.edu/dataset/240/human+activity+recognition+using+smartphones) dataset.

The experiments have been carried out with a group of volunteers within an age bracket of 19-48 years. Each person performed six activities (WALKING, WALKING_UPSTAIRS, WALKING_DOWNSTAIRS, SITTING, STANDING, LAYING) wearing a smartphone (Samsung Galaxy S II) on the waist. Using its embedded accelerometer and gyroscope, we captured 3-axial linear acceleration and 3-axial angular velocity at a constant rate of 50Hz. The experiments have been video-recorded to label the data manually. The obtained dataset has been randomly partitioned into two sets, where 70% of the volunteers was selected for generating the training data and 30% the test data. 

---

For the purpose of this course, we have sampled this dataset to keep only 10% of the records. 



Citation:
[A Public Domain Dataset for Human Activity Recognition using Smartphones](https://www.semanticscholar.org/paper/A-Public-Domain-Dataset-for-Human-Activity-using-Anguita-Ghio/83de43bc849ad3d9579ccf540e6fe566ef90a58e)
By D. Anguita, A. Ghio, L. Oneto, X. Parra, Jorge Luis Reyes-Ortiz. 2013
Published in The European Symposium on Artificial Neural Networks

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
from torch import nn
import pytorch_model_summary as pms 

from torch.utils.data import TensorDataset, DataLoader

from pytorchtools import EarlyStopping

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


For reference, here is the code we used to load and subset the original dataset:

In [None]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("drsaeedmohsen/ucihar-dataset")

# print("Path to dataset files:", path)

# # 1. Load Inertial Signals
# def load_signals(folder, subset):
#     signals = []
#     signal_names = [
#         'body_acc_x_', 'body_acc_y_', 'body_acc_z_',
#         'body_gyro_x_', 'body_gyro_y_', 'body_gyro_z_',
#         'total_acc_x_', 'total_acc_y_', 'total_acc_z_'
#     ]
#     for name in signal_names:
#         filename = os.path.join(folder, subset, 'Inertial Signals', name + subset + '.txt')
#         data = np.loadtxt(filename)
#         signals.append(data)
#     # Stack signals: shape (samples, timesteps, features)
#     return np.transpose(np.array(signals), (1, 2, 0))

# def load_labels(folder, subset):
#     filename = os.path.join(folder, subset, 'y_' + subset + '.txt')
#     return np.loadtxt(filename).astype(int) - 1  # Classes start at 0

# data_folder = 'data/UCI_HAR/'
# X_train = load_signals(data_folder, 'train')
# y_train = load_labels(data_folder, 'train')
# X_test = load_signals(data_folder, 'test')
# y_test = load_labels(data_folder, 'test')


# # Assume X_train and y_train are NumPy arrays
# fraction = 0.1  # 10%
# subset_size = int(len(X_train) * fraction)

# # Randomly select indices without replacement
# indices = np.random.choice(len(X_train), subset_size, replace=False)

# # Create smaller subsets
# X_train_small = X_train[indices]
# y_train_small = y_train[indices]

# subset_size = int(len(X_test) * fraction)

# # Randomly select indices without replacement
# indices = np.random.choice(len(X_test), subset_size, replace=False)

# X_test_small = X_test[indices]
# y_test_small = y_test[indices]

# X_train = X_train_small
# y_train = y_train_small
# X_test = X_test_small
# y_test = y_test_small

# print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")  # (7352, 128, 9)

# np.save("data/UCI_HAR_subset/x_train.npy" , X_train)
# np.save("data/UCI_HAR_subset/x_valid.npy" , X_test)
# np.save("data/UCI_HAR_subset/y_train.npy" , y_train)
# np.save("data/UCI_HAR_subset/y_valid.npy" , X_test)

In [None]:
X_train = np.load("data/UCI_HAR_subset/x_train.npy")
X_valid = np.load("data/UCI_HAR_subset/x_valid.npy")
y_train = np.load("data/UCI_HAR_subset/y_train.npy")
y_valid = np.load("data/UCI_HAR_subset/y_valid.npy")

print(f"Train shape: {X_train.shape}, Valid shape: {X_valid.shape}") 

Each record consist in 128 time points measured for 9 features:
 - *body_acc_x/y/z*: The body acceleration signal obtained by subtracting the gravity from the total acceleration.
 - *body_gyro_x/y/z*: The angular velocity vector measured by the gyroscope for each window sample. The units are radians/second. 
 - *total_acc_x/y/z*: The acceleration signal from the smartphone accelerometer X,Y and Z axis in standard gravity units 'g'.


In [None]:
sns.histplot( y_train )

The target is:
 * 0 WALKING
 * 1 WALKING_UPSTAIRS
 * 2 WALKING_DOWNSTAIRS
 * 3 SITTING
 * 4 STANDING
 * 5 LAYING


In [None]:
# Building our loaders

train_dataset = TensorDataset( torch.tensor( X_train, dtype=torch.float32) ,
                            torch.tensor( y_train, dtype=torch.long) 
                            )
valid_dataset = TensorDataset( torch.tensor( X_valid, dtype=torch.float32) ,
                            torch.tensor( y_valid, dtype=torch.long) 
                            )


train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64)


In [None]:
# GRU Model
class HAR_GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(HAR_GRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)


        self.fc = nn.Linear(hidden_size, num_classes) ## this could be as complex as we need it to be.
        
    def forward(self, x):

        # NB: the GRU could also take an additional input which would be the hidden state
        #     of a previous RNN layer. Here by default it is a set of 0s
        out, _ = self.gru(x) # output: GRU output , hidden state of last GRU layer
        out = out[:, -1, :] ## we take the output of the last sequence element 
        return self.fc(out) ## and pass it to the dense layer

In [None]:
model = HAR_GRU(input_size=9, hidden_size=32, num_layers=2, num_classes=6).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# --- 1) Prepare metric containers ---
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

In [None]:
%%time
# Training Loop
epochs = 100


for epoch in range(epochs):

    ## 
    model.train()
    train_loss, correct, total = 0, 0, 0
    for X, y in train_loader:
        X, y = X.to(device), y.to(device)
        outputs = model(X)
        loss = criterion(outputs, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == y).sum().item()
        total += y.size(0)
    train_acc = correct / total

    # Validation
    model.eval()
    val_loss, val_correct, val_total = 0, 0, 0
    with torch.no_grad():
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            outputs = model(X)
            loss = criterion(outputs, y)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_correct += (predicted == y).sum().item()
            val_total += y.size(0)
    val_acc = val_correct / val_total


# --- 2) Store metrics each epoch ---
    train_losses.append(train_loss / total)
    val_losses.append(val_loss / val_total)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)


In [None]:

# --- 3) Plot with Matplotlib ---
import matplotlib.pyplot as plt

epochs_range = range(1, len(train_losses) + 1)

plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, train_losses, label='Train Loss', marker='o')
plt.plot(epochs_range, val_losses, label='Val Loss', marker='o')
plt.title('Loss per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(epochs_range, train_accuracies, label='Train Accuracy', marker='o')
plt.plot(epochs_range, val_accuracies, label='Val Accuracy', marker='o')
plt.title('Accuracy per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('training_curves.png', dpi=150)  # saves both plots in one image
plt.show()
