# Homework 4

## 2. Recurrent Network (NN) Classification

In [1]:
import os
import numpy as np
import torch
import random
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [2]:
!unzip "UCI HAR Dataset.zip"
# !unzip "/content/human+activity+recognition+using+smartphones.zip"

Archive:  UCI HAR Dataset.zip
replace UCI HAR Dataset/.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [3]:
# Helper function to load the raw inertial signals
def load_ucihar(data_dir="UCI HAR Dataset", subset="train"):
    """ Loads the UCI HAR data from the Inertial Signals folder. Returns: X: numpy array of shape (num_samples, seq_len, num_signals) y: numpy array of labels (0-indexed) """
    # The nine signal types available in the dataset
    signal_types = [ "body_acc_x", "body_acc_y", "body_acc_z", "body_gyro_x", "body_gyro_y", "body_gyro_z", "total_acc_x", "total_acc_y", "total_acc_z" ]
    signals = []
    # Each signal file is located in {data_dir}/{subset}/Inertial Signals/
    for signal in signal_types:
        filename = os.path.join(data_dir, subset, "Inertial Signals", f"{signal}_{ subset}.txt")
        # Each file has shape (num_samples, 128)
        data = np.loadtxt(filename) # Add a new axis so that we can later stack to shape (num_samples, 128, num_signals)
        signals.append(data[..., np.newaxis]) # Stack along the last dimension to form (num_samples, 128, 9)
    X = np.concatenate(signals, axis=2) # Load labels from y_{subset}.txt; labels in the dataset are 1-indexed, so subtract 1.
    y_path = os.path.join(data_dir, subset, f"y_{subset}.txt")
    y = np.loadtxt(y_path).astype(int)- 1
    return X, y

In [4]:
# Define a PyTorch Dataset for UCI HAR
class UCIHARDataset(Dataset):
  def __init__(self, data_dir="UCI HAR Dataset", subset="train"):
    self.X, self.y = load_ucihar(data_dir, subset)

  def __len__(self):
    return self.X.shape[0]

  def __getitem__(self, idx):
    sample = torch.tensor(self.X[idx], dtype=torch.float32)
    label = torch.tensor(self.y[idx], dtype=torch.long)
    return sample, label

In [5]:
SEED = 1
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

default_input_dim = 9
default_feature_dim = 16
default_num_classes = 6

# define model
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first= True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x. device)
        out, _ = self.rnn(x, h0)
        logit = self.fc(out[:,-1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit


In [6]:
num_epochs, batch_size, lr = 10, 16, 0.001

# create train, test dataset
train_dataset = UCIHARDataset(subset="train")
test_dataset = UCIHARDataset(subset="test")

#    create train, test loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle= True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle= False)

# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=1, num_classes=6)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

In [7]:
def train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs):
    # start training
    for epoch in range(num_epochs):
        model.train()
        for i, (inputs, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            activity_prob, activity_logit = model(inputs)
            loss = criterion(activity_logit, labels)
            loss.backward()
            optimizer.step()

    # Evaluate on test data
    model.eval()
    test_acc = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            activity_prob, activity_logit = model(inputs)
            _, predicted = torch.max(activity_logit.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_acc = correct / total
    print(f"Test Accuracy: {test_acc:.4f}")

In [8]:
# Evaluate on test data
train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.6166


In [9]:
feature_dim = 64

# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=feature_dim, num_layers=1, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.6454


In [10]:
feature_dim = 16
num_layers = 2

# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=feature_dim, num_layers=num_layers, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.6166


In [11]:
num_layers = 3

# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=num_layers, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.7615


In [12]:
num_layers = 4

# create model, loss criterion, optimizer
model = RNNClassifier(input_size=9, hidden_size=16, num_layers=num_layers, num_classes=6)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.2891


In [13]:
# Replace RNN with LSTM (feature_dim=16, num_layers=1)
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        logit = self.fc(out[:, -1, :])
        prob = nn.functional.softmax(logit, dim=1)
        return prob, logit

feature_dim = 16
num_layers = 1
input_size = 9
num_classes = 6

# create model, loss criterion, optimizer
model = LSTMClassifier(input_size=input_size, hidden_size=feature_dim, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model_and_test(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Test Accuracy: 0.6342


In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [15]:
class PatchEmbedding(nn.Module):
    """ Splits the image into patches and embeds them. """
    def __init__(self, in_channels=3, patch_size=4, emb_size=128, img_size=32):
        super().__init__()
        self.patch_size = patch_size
        # We use a simple conv layer to perform patchify + embedding in one step.
        self.proj = nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size)
        # Number of patches
        num_patches = (img_size // patch_size) * (img_size // patch_size)
        # Class token
        self.cls_token = nn.Parameter(torch.zeros(1, 1, emb_size))
        # Positional embedding
        self.pos_emb = nn.Parameter(torch.zeros(1, num_patches + 1, emb_size))

    def forward(self, x):
        """ x shape: (B, 3, 32, 32) returns: (B, N+1, emb_size) """
        B = x.size(0)
        # Conv2d-> (B, emb_size, H’, W’), with H’ and W’ = 32 // patch_size
        x = self.proj(x)
        x = x.flatten(2)
        x = x.transpose(1, 2)

        # Class token
        cls_token = self.cls_token.expand(B,-1,-1) # (B, 1, emb_size)
        x = torch.cat([cls_token, x], dim=1) # (B, N+1, emb_size)
        # Add positional embedding
        x = x + self.pos_emb[:, : x.size(1), :]
        return x

In [16]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, emb_size=128, num_heads=4, dropout=0.1):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.head_dim = emb_size // num_heads
        self.qkv = nn.Linear(emb_size, 3 * emb_size)
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)

    def forward(self, x): # x shape: (B, N, emb_size)
        B, N, _ = x.shape
        qkv = self.qkv(x) # (B, N, 3*emb_size)
        qkv = qkv.reshape(B, N, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4) # (3, B, num_heads, N, head_dim)
        q, k, v = qkv[0], qkv[1], qkv[2] # each: (B, num_heads, N, head_dim)

        # Scaled Dot-Product Attention
        # # scores shape: (B, num_heads, N, N)
        scores = torch.matmul(q, k.transpose(-2,-1)) / math.sqrt(self.head_dim)
        att = torch.softmax(scores, dim=-1)
        att = self.att_drop(att)

        # out shape: (B, num_heads, N, head_dim)
        out = torch.matmul(att, v)

        # Combine heads
        out = out.transpose(1, 2) # (B, N, num_heads, head_dim)
        out = out.flatten(2) # (B, N, emb_size)
        out = self.projection(out)

        return out

In [17]:
class TransformerEncoderBlock(nn.Module):
    def __init__(self, emb_size=128, num_heads=4, expansion=4, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(emb_size)
        self.attn = MultiHeadSelfAttention(emb_size, num_heads, dropout)
        self.norm2 = nn.LayerNorm(emb_size)

        # Feed-forward network
        self.ffn = nn.Sequential(
            nn.Linear(emb_size, expansion * emb_size),
            nn.GELU(), nn.Dropout(dropout),
            nn.Linear(expansion * emb_size, emb_size)
            )
        self.drop = nn.Dropout(dropout)

    def forward(self, x):
        # Attention block
        x_res = x
        x = self.norm1(x)
        x = self.attn(x)
        x = x_res + self.drop(x)

        # Feed-forward block
        x_res = x
        x = self.norm2(x)
        x = self.ffn(x)
        x = x_res + self.drop(x)
        return x

In [18]:
class VisionTransformer(nn.Module):
    def __init__(self, in_channels=3, patch_size=4, emb_size=128, img_size=32, num_heads=4, num_layers=6, num_classes=10, dropout=0.1):
        super().__init__()
        self.patch_embed = PatchEmbedding(in_channels, patch_size, emb_size, img_size)
        self.encoder = nn.Sequential(*[
            TransformerEncoderBlock(
                emb_size=emb_size,
                num_heads=num_heads,
                expansion=4,
                dropout=dropout
            ) for _ in range(num_layers)
        ])

        self.norm = nn.LayerNorm(emb_size)
        self.cls_head = nn.Linear(emb_size, num_classes)

    def forward(self, x):
        # x shape: (B, 3, 32, 32)
        x = self.patch_embed(x) # (B, N+1, emb_size)
        x = self.encoder(x) # (B, N+1, emb_size)
        x = self.norm(x) # (B, N+1, emb_size)

        # The first token is the class token
        cls_token_final = x[:, 0]
        out = self.cls_head(cls_token_final) # (B, num_classes)
        return out