In [1]:
# Use PyTorch to implement Recurrent Neural Network (RNN) for time series prediction

# Dataset: Human Activity Recognition using Smartphones
    # Contains timeseries data from smartphone sensors (accelerometer and gyroscope) while performing daily activities
# Goal: Classify human activity from smartphone sensor data
    
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.autograd import Variable


In [7]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
# Helper function to load the raw inertial signals
def load_ucihar(data_dir='UCI HAR Dataset', subset='train'):
    """
    Loads the UCI HAR data from the Inertial Signals folder.
    Returns:
    X: numpy array of shape (num_samples, seq_len, num_signals)
    y: numpy array of labels (0-indexed)
    """
    # The nine signal types available in the dataset
    signal_types = [
    "body_acc_x", "body_acc_y", "body_acc_z",
    "body_gyro_x", "body_gyro_y", "body_gyro_z",
    "total_acc_x", "total_acc_y", "total_acc_z"
    ]
    signals = []
    # Each signal file is located in {data_dir}/{subset}/Inertial Signals/
    for signal in signal_types:
        filename = os.path.join(data_dir, subset, "Inertial Signals", f"{signal}_{
        subset}.txt")
        # Each file has shape (num_samples, 128)
        data = np.loadtxt(filename)
        # Add a new axis so that we can later stack to shape (num_samples, 128, num_signals)
        signals.append(data[..., np.newaxis])

    # Stack along the last dimension to form (num_samples, 128, 9)
    X = np.concatenate(signals, axis=2)

    # Load labels from y_{subset}.txt; labels in the dataset are 1-indexed, so subtract 1.
    y_path = os.path.join(data_dir, subset, f"y_{subset}.txt")
    y = np.loadtxt(y_path).astype(int) - 1
    return X, y
# Define a PyTorch Dataset for UCI HAR
class UCIHARDataset(Dataset):

    def __init__(self, data_dir='UCI HAR Dataset', subset='train'):
        self.X, self.y = load_ucihar(data_dir, subset)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        sample = torch.tensor(self.X[idx], dtype=torch.float32)
        label = torch.tensor(self.y[idx], dtype=torch.long)
        return sample, label


# load ucihar
x, y = load_ucihar()    

In [8]:
# Verify:
print(f"x shape: {x.shape}, y shape: {y.shape}")


x shape: (7352, 128, 9), y shape: (7352,)


In [None]:
# 1. Implement RNN using PyTorch
    # a. Build a 1-layer RNN that reads the inputs (input_dim=9) and produce hidden features (use feature_dim=16)
    # b. Build a linear classifier that classifies the activity labels (number of activity = 6) from hidden features
    # c. Train the model using CrossEntropyLoss and Adam optimizer
    # d. Train for 10 epochs with batch_size=16, learning_rate = 0.001 and report the test accuracy
    # e. Provided pseudo-code for training. MAKE SURE TO FIX RANDOM SEED FOR CONSISTENT RESULTS

class RNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -1, :])  # Use the last time step's output
        return out

In [12]:
import random 
# fix random seed.
SEED = 1
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# define model
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        logit = self.fc(out[:, -1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit
    
num_epochs, batch_size, lr = 10, 64, 0.001
# create train, test dataset
train_dataset, test_dataset = UCIHARDataset('UCI HAR Dataset', 'train'), UCIHARDataset('UCI HAR Dataset', 'test')
# create train, test loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=
True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=
False)
# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=1, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")




Test Accuracy: 57.41%


In [13]:
# 2. Train and eval with different architectures, report test accuracy for each model

# a. change feature dimension to 64
# define model
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        logit = self.fc(out[:, -1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit
    
num_epochs, batch_size, lr = 10, 64, 0.001
# create train, test dataset
train_dataset, test_dataset = UCIHARDataset('UCI HAR Dataset', 'train'), UCIHARDataset('UCI HAR Dataset', 'test')
# create train, test loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=
True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=
False)
# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=64, num_layers=1, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")





Test Accuracy: 56.43%


In [14]:
# b. Change number of RNN layers to 2,3,4 (keep feature dimension 16)
# define model
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        logit = self.fc(out[:, -1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit
    
num_epochs, batch_size, lr = 10, 64, 0.001
# create train, test dataset
train_dataset, test_dataset = UCIHARDataset('UCI HAR Dataset', 'train'), UCIHARDataset('UCI HAR Dataset', 'test')
# create train, test loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=
True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=
False)

### NUM_LAYERS = 2 ###
# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=2, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")



Test Accuracy: 60.74%


In [15]:
### NUM_LAYERS = 3 ###
# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=3, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")


Test Accuracy: 62.13%


In [16]:
### NUM_LAYERS = 4 ###
# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=4, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")


Test Accuracy: 64.03%


In [18]:
# c. Change RNN to LSTM (keep feature dimension 16, number of layers = 1)
# b. Change number of RNN layers to 2,3,4 (keep feature dimension 16)
# define model
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        logit = self.fc(out[:, -1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit
    
num_epochs, batch_size, lr = 10, 64, 0.001
# create train, test dataset
train_dataset, test_dataset = UCIHARDataset('UCI HAR Dataset', 'train'), UCIHARDataset('UCI HAR Dataset', 'test')
# create train, test loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=
True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=
False)

### NUM_LAYERS = 2 ###
# create model, loss criterion, optimizer
model = LSTMClassifier(input_size=9, hidden_size=16, num_layers=2, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# start training
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        activity_prob, activity_logit = model(inputs)
        loss = criterion(activity_logit, labels)
        loss.backward()
        optimizer.step()
# evaluate on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        activity_prob, activity_logit = model(inputs)
        _, predicted = torch.max(activity_prob.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# print accuracy
print(f"Test Accuracy: {100 * correct / total:.2f}%")



Test Accuracy: 75.47%
