# **Project: Human Activity Recognition using accelometer and gyroscope data**

# **© Copyright: Utsab Saha and Sawradip Saha**

In [None]:
import pandas as pd
import numpy as np
import random  # Import the random module
import torch
import os
import matplotlib.pyplot as plt
import time
from sklearn.model_selection import train_test_split
import os
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
def seed_everything(seed=42):
    
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
seed_everything(seed=42)

In [None]:
# Activities are the class labels
# It is a 6 class classification
ACTIVITIES = {
    0: 'WALKING',
    1: 'WALKING_UPSTAIRS',
    2: 'WALKING_DOWNSTAIRS',
    3: 'SITTING',
    4: 'STANDING',
    5: 'LAYING',
}

# Utility function to print the confusion matrix
def confusion_matrix(Y_true, Y_pred):
    Y_true = pd.Series([ACTIVITIES[y] for y in np.argmax(Y_true, axis=1)])
    Y_pred = pd.Series([ACTIVITIES[y] for y in np.argmax(Y_pred, axis=1)])

    return pd.crosstab(Y_true, Y_pred, rownames=['True'], colnames=['Pred'])

In [None]:
# Data directory
DATADIR = '/kaggle/input/har-data-uci/UCI HAR Dataset'

In [None]:
SIGNALS = [
    "body_acc_x",
    "body_acc_y",
    "body_acc_z",
    "body_gyro_x",
    "body_gyro_y",
    "body_gyro_z",
    "total_acc_x",
    "total_acc_y",
    "total_acc_z"
]

In [None]:
# function to read the data from csv file
def _read_csv(filename):
    return pd.read_csv(filename, delim_whitespace=True, header=None)

def load_signals(subset):
    signals_data = []

    for signal in SIGNALS:
        filename = f'/kaggle/input/har-data-uci/UCI HAR Dataset/{subset}/Inertial Signals/{signal}_{subset}.txt'
        signals_data.append(
            _read_csv(filename).values
        ) 

    # Transpose is used to change the dimensionality of the output,
    # aggregating the signals by combination of sample/timestep.
    # Resultant shape is (7352 train/2947 test samples, 128 timesteps, 9 signals)
    return np.transpose(signals_data, (1, 2, 0))

def load_y(subset):
    filename = f'/kaggle/input/har-data-uci/UCI HAR Dataset/{subset}/y_{subset}.txt'
    y = _read_csv(filename)[0]

    return y
def load_data():
    X_train, X_test = load_signals('train'), load_signals('test')
    y_train, y_test = load_y('train'), load_y('test')

    return X_train, X_test, y_train, y_test

# function to count the number of classes
def _count_classes(y):
    return len(set([tuple(category) for category in y]))

In [None]:
# Loading the train and test data
X_train, X_test, y_train, y_test = load_data()
y_train  = y_train - 1
y_test  = y_test - 1

In [None]:
# Splitting X_train_static and y_train_static into training and test sets
test_size = 0.2
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=test_size, random_state=42)

print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_val shape:", X_val.shape)
print("y_val shape:", y_val.shape)


In [None]:
class MyHARDataset(Dataset):
    def __init__(self, feat_matrix, y_feat):
        self.feat_matrix = feat_matrix
        self.y_feat = y_feat
        
    def __len__(self):
        return self.feat_matrix.shape[0]

    def __getitem__(self, idx):
        feat_x = self.feat_matrix[idx].transpose()
        feat_y = self.y_feat.values[idx]
        return torch.from_numpy(feat_x).float(), feat_y

In [None]:
training_data = MyHARDataset(X_train, y_train)
val_data = MyHARDataset(X_val , y_val)
train_dataloader = DataLoader(training_data, batch_size=10, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=10, shuffle=True)

test_data = MyHARDataset(X_test,y_test)
test_dataloader = DataLoader(test_data,batch_size=10, shuffle=False)


In [None]:
for xd, yd in train_dataloader :
    print(yd)
    break

In [None]:
def weight_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight, gain=nn.init.calculate_gain('relu'))
        nn.init.zeros_(m.bias)

class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, num_conv_layers, batch_norm=True):
        super(ResNetBlock, self).__init__()

        # The first convolutional layer
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        if batch_norm:
            self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        # Additional convolutional layers
        layers = []
        for _ in range(num_conv_layers - 1):
            layers.append(nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1))
            if batch_norm:
                layers.append(nn.BatchNorm1d(out_channels))
            layers.append(nn.ReLU(inplace=True))
        
        # The final convolutional layer
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        if batch_norm:
            self.bn2 = nn.BatchNorm1d(out_channels)

        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Adjust the dimensions of the skip connection
        self.identity_conv = nn.Conv1d(in_channels, out_channels, kernel_size=1)
        
    def forward(self, x):
        identity = self.identity_conv(x)  # Adjusted skip connection
        out = self.conv1(x)
        if hasattr(self, 'bn1'):
            out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        if hasattr(self, 'bn2'):
            out = self.bn2(out)
        
        # Apply skip connection
        out += identity
        out = self.relu(out)
        
        out = self.maxpool(out)
        return out

class ResNet(nn.Module):
    def __init__(self):
        super(ResNet, self).__init__()

        self.block1 = ResNetBlock(9, 64, num_conv_layers=2)
        self.block2 = ResNetBlock(64, 128, num_conv_layers=2)
        self.block3 = ResNetBlock(128, 256, num_conv_layers=3)
        self.block4 = ResNetBlock(256, 512, num_conv_layers=3)
        self.block5 = ResNetBlock(512, 512, num_conv_layers=3)

        self.avgpool = nn.AdaptiveAvgPool1d(1)  # Global average pooling
        self.fc1 = nn.Linear(512, 3)  # Adjusted input size based on the layers
        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x= self.softmax(x)
#         print(x.shape)
        return x

In [None]:
class FusionActNet(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(FusionActNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False)
        self.bn1 = nn.BatchNorm1d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        
        self.conv2 = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channels)
        
        self.identityconv = nn.Conv1d(in_channels,out_channels,kernel_size=1)
    def forward(self, x):
        identity = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)

#         if identity.size(1) != x.size(1):
#             identity = nn.Conv1d(identity.size(1), x.size(1), kernel_size=1).to(x.device)(identity)
        identity = self.identityconv(identity)    
        x += identity
        
        return x

class MyDeeperFusionActNet(nn.Module):
    def __init__(self):
        super(MyDeeperFusionActNet, self).__init__()

        # Load pretrained static and dynamic models
        self.model_static = ResNet()
        self.model_static.load_state_dict(torch.load('/kaggle/input/har-uci-weights/new_static.pth'))
        self.model_static.train(False)
        
        self.model_dynamic = ResNet()
        self.model_dynamic.load_state_dict(torch.load('/kaggle/input/har-uci-weights/new_dynamic.pth'))
        self.model_dynamic.train(False)
        
        # MobileNetV1-style layers
        self.conv1 = nn.Conv1d(9, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(32)
        self.relu = nn.ReLU(inplace=True)
        self.block1 = self.make_block(32, 64, 2)  # Depthwise: 2, Pointwise: 2
        self.block2 = self.make_block(64, 128, 3)  # Depthwise: 3, Pointwise: 3
        self.block3 = self.make_block(128, 256, 3)  # Depthwise: 3, Pointwise: 3
        
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(256, 1)
        self.sigmoid = nn.Sigmoid()
#         self.linear = nn.Linear(6,6)
#         self.softmax = nn.Softmax(dim=1)

    def make_block(self, in_channels, out_channels, num_blocks):
        layers = []
        for _ in range(num_blocks):
            layers.append(FusionActNet(in_channels, out_channels, stride=1))
            in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        x_static = self.model_static(x)
        x_dynamic = self.model_dynamic(x)
#         print(x_static.shape)
#         print(x_dynamic.shape)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        
        x = self.fc(x)
        x_weight = self.sigmoid(x)
#         print(x_weight.shape)
        x_almost_out = torch.cat((x_static * x_weight, x_dynamic * (1 - x_weight)), -1)
        return x_almost_out

In [None]:
model = MyDeeperFusionActNet().cuda()
model.apply(weight_init)
model(torch.zeros(10, 9, 128).cuda())

In [None]:
import torch.optim as optim

# Define loss function

criterion = nn.CrossEntropyLoss()

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.000001)

# Training loop
def train(model, train_loader, optimizer, criterion):
    model.train()
    total_loss = 0.0
    correct = 0
    total_samples = 0
    for batch_idx, (x, y) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(x.cuda())
        loss = criterion(outputs, y.cuda())
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y.cuda()).sum().item()
            total_samples += y.size(0)
    train_loss = total_loss / len(train_loader)
    train_accuracy = correct / total_samples
    return train_loss ,train_accuracy

# Validation loop
def validate(model, val_loader, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total_samples = 0
    with torch.no_grad():
        for batch_idx, (x, y) in enumerate(val_loader):
            outputs = model(x.cuda())
            loss = criterion(outputs, y.cuda())
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y.cuda()).sum().item()
            total_samples += y.size(0)
    
    val_loss = total_loss / len(val_loader)
    val_accuracy = correct / total_samples
    return val_loss, val_accuracy

# Training and validation loop
num_epochs = 60
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_dataloader, optimizer, criterion)
    val_loss, val_accuracy = validate(model, test_dataloader, criterion)
    
    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f}, "
          f"Train Accuracy: {train_accuracy:.4f}, "
          f"Val Loss: {val_loss:.4f}, "
          f"Val Accuracy: {val_accuracy:.4f}")
    
    # Save the model weights if validation accuracy improves
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_model_weights = model.state_dict()
        torch.save(best_model_weights, 'best_combined_model_resnet_weights.pth')

# Load the best model weights
model.load_state_dict(best_model_weights)

In [None]:
# Load the best model weights
# model.load_state_dict(torch.load('/kaggle/working/best_combined_model_resnet_weights.pth'))
model.eval()

# Testing loop
test_correct = 0
total_test_samples = 0

with torch.no_grad():
    for batch_idx, (x_test, y_test) in enumerate(test_dataloader):
        outputs = model(x_test.cuda())
        _, predicted = torch.max(outputs, 1)
        test_correct += (predicted == y_test.cuda()).sum().item()
        total_test_samples += y_test.size(0)

test_accuracy = test_correct / total_test_samples
print(f"Test Accuracy: {test_accuracy:.4f}")

In [None]:
from sklearn.metrics import confusion_matrix, classification_report

# Load the best model weights
model.load_state_dict(torch.load('/kaggle/input/har-uci-weights/new_combined.pth'))
model.eval()

# Initialize lists to store true labels and predicted labels
true_labels = []
predicted_labels = []

with torch.no_grad():
    for batch_idx, (x_test, y_test) in enumerate(test_dataloader):
        outputs = model(x_test.cuda())
        _, predicted = torch.max(outputs, 1)
        
        true_labels.extend(y_test.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())

# Generate confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Calculate precision, recall, and F1 score
report = classification_report(true_labels, predicted_labels, target_names=["class_0", "class_1", "class_2","class_3","class_4","class_5"])

print("Confusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(report)