In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn.functional as F

In [2]:
files = [
    "../../Data/empty1_timedomain.csv",
    "../../Data/jumpAyon_timedomain.csv",
    "../../Data/situpAyon_timedomain.csv",
    "../../Data/standAyon_timedomain.csv",
    "../../Data/walkAyon_timedomain.csv"
]

In [3]:
data = []
labels = []
counter = 0
for file_path in files:
    f = pd.read_csv(file_path, header=None, skiprows=1)
    for i in range(0, len(f), 50):
        data_chunk = f.iloc[i:i+50].to_numpy()
        if len(data_chunk) < 50:  # Check if the chunk is complete
            continue 
        if(np.isnan(data_chunk).any()):
            continue
        data.append(data_chunk)
        labels.append(counter)
    counter += 1
    

In [4]:
for i,chunk in enumerate(data):
    if(chunk.shape[0] != 50 or chunk.shape[1] != 56):
        print(f"inconsistency found for element: {i}")

In [5]:
data_array = np.array(data)
labels_array = np.array(labels)

In [6]:
X_train, X_test, y_train, y_test = train_test_split(data_array, labels_array, 
                                                    test_size=0.2, random_state=42)

In [7]:
import numpy as np

# Check for NaN or Inf in your dataset
print("NaN in data:", np.isnan(X_train).any())
print("Inf in data:", np.isinf(X_train).any())


NaN in data: False
Inf in data: False


In [63]:
class UT_HAR_MLP(nn.Module):
    def __init__(self):
        super(UT_HAR_MLP, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(50 * 56, 1024),
            nn.ReLU(),
            nn.Linear(1024, 128),
            nn.ReLU(),
            nn.Linear(128, 5)
        )

    def forward(self, x):
        x = x.view(-1, 50 * 56)
        x = self.fc(x)
        return x

In [51]:
class UT_HAR_LeNet(nn.Module):
    def __init__(self):
        super(UT_HAR_LeNet, self).__init__()
        self.encoder = nn.Sequential(
            # Input size: (1, 50, 56)
            nn.Conv2d(1, 32, kernel_size=5, stride=(3, 1)),  # Output: (32, 16, 52)
            nn.ReLU(True),
            nn.MaxPool2d(2),                                # Output: (32, 8, 26)
            nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)),  # Output: (64, 4, 13)
            nn.ReLU(True),
            nn.MaxPool2d(2, padding=(0, 1)),                # Output: (64, 2, 7)
            nn.Conv2d(64, 96, kernel_size=(2, 2), stride=1, padding=(0, 0)),       # Output: (96, 1, 6)
            nn.ReLU(True)                       
        )
        self.fc = nn.Sequential(
            nn.Linear(96*1*6, 128),       
            nn.ReLU(),
            nn.Linear(128, 5)                      
        )

    def forward(self, x):
        x = self.encoder(x)
        # print(f"Shape after encoder: {x.shape}")  # Debugging shape
        x = x.view(x.size(0), -1)  # Flatten
        # print(f"Shape after flattening: {x.shape}")  # Debugging shape after flattening
        out = self.fc(x)
        return out

In [64]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model1 = UT_HAR_MLP().to(device)

In [53]:
model2 = UT_HAR_LeNet().to(device)

In [54]:
#setting up the hyperparameters
batch_size = 1
input_size = X_train.shape[1:]
hidden_size1 = 500
hidden_size2 = 300
hidden_size3 = 100
num_classes = 5
num_epochs = 50
learning_rate = 0.0001
n_samples = X_train.shape[0]

In [55]:
#transfering training, testing data and labels to device
X_train, y_train = torch.tensor(X_train, dtype=torch.float32).to(device), torch.tensor(y_train, dtype = torch.long).to(device)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)

  X_train, y_train = torch.tensor(X_train, dtype=torch.float32).to(device), torch.tensor(y_train, dtype = torch.long).to(device)
  X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)


In [56]:
train_dataset = TensorDataset(X_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = TensorDataset(X_test, y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [57]:
#criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model2.parameters(), lr = learning_rate)

In [58]:
training_losses = []
test_losses = []

In [65]:
for epoch in range(num_epochs):
    model1.train()
    running_loss = 0.0

    for back_idx, (data, label) in enumerate(train_dataloader):
        # print(f'data loader data shape: {data.shape}')
        #zero grad the optimizer
        optimizer.zero_grad()
        #take the output
        output = model1(data)
        # output = model2(data)
        #find the loss
        loss = criterion(output, label)
        #back propagation
        loss.backward()
        #optimizer step
        optimizer.step()

        running_loss += loss.item()
    avg_train_loss = running_loss/len(train_dataloader)
    training_losses.append(avg_train_loss)
    # break


    model1.eval()
    
    with torch.no_grad():
        val_loss = 0.0
        correct = 0
        total = 0
        for data, target in test_dataloader:
            data, target = data.to(device), target.to(device)
            # data = data.unsqueeze(1)
            output = model1(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            val_loss += criterion(output, target.long()).item()
        avg_val_loss = val_loss / len(test_dataloader)
        test_losses.append(avg_val_loss)
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Accuracy: {100*correct/total:.2f}%')



Epoch 1/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 2/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 3/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 4/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 5/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 6/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 7/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 8/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 9/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 10/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 11/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 12/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 13/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 14/50, Train Loss: 3.7702, Val Loss: 3.9391, Accuracy: 19.15%
Epoch 15/50, Train Loss: 3.7702, Val Loss: 3.9391, Accura

In [61]:
for epoch in range(num_epochs):
    model2.train()
    running_loss = 0.0

    for back_idx, (data, label) in enumerate(train_dataloader):
        # print(f'data loader data shape: {data.shape}')
        #zero grad the optimizer
        optimizer.zero_grad()
        #take the output
        data = data.unsqueeze(1)
        output = model2(data)
        # output = model2(data)
        #find the loss
        loss = criterion(output, label)
        #back propagation
        loss.backward()
        #optimizer step
        optimizer.step()

        running_loss += loss.item()
    avg_train_loss = running_loss/len(train_dataloader)
    training_losses.append(avg_train_loss)
    # break


    model2.eval()
    
    with torch.no_grad():
        val_loss = 0.0
        correct = 0
        total = 0
        for data, target in test_dataloader:
            data, target = data.to(device), target.to(device)
            data = data.unsqueeze(1)
            output = model2(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            val_loss += criterion(output, target.long()).item()
        avg_val_loss = val_loss / len(test_dataloader)
        test_losses.append(avg_val_loss)
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Accuracy: {100*correct/total:.2f}%')



Epoch 1/50, Train Loss: 1.5046, Val Loss: 1.3731, Accuracy: 35.26%
Epoch 2/50, Train Loss: 1.3702, Val Loss: 1.3118, Accuracy: 36.78%
Epoch 3/50, Train Loss: 1.3263, Val Loss: 1.2575, Accuracy: 58.66%
Epoch 4/50, Train Loss: 1.2588, Val Loss: 1.1403, Accuracy: 61.40%
Epoch 5/50, Train Loss: 1.2129, Val Loss: 1.1229, Accuracy: 58.66%
Epoch 6/50, Train Loss: 1.1532, Val Loss: 1.0410, Accuracy: 62.92%
Epoch 7/50, Train Loss: 1.0957, Val Loss: 1.0757, Accuracy: 59.57%
Epoch 8/50, Train Loss: 1.0905, Val Loss: 0.9613, Accuracy: 62.61%
Epoch 9/50, Train Loss: 1.0289, Val Loss: 1.0134, Accuracy: 58.97%
Epoch 10/50, Train Loss: 1.0037, Val Loss: 0.9478, Accuracy: 66.57%
Epoch 11/50, Train Loss: 0.9567, Val Loss: 1.0352, Accuracy: 61.70%
Epoch 12/50, Train Loss: 0.9909, Val Loss: 0.9485, Accuracy: 61.09%
Epoch 13/50, Train Loss: 0.9656, Val Loss: 0.9110, Accuracy: 62.61%
Epoch 14/50, Train Loss: 0.9341, Val Loss: 0.8548, Accuracy: 67.17%
Epoch 15/50, Train Loss: 0.9079, Val Loss: 0.8583, Accura

In [77]:
from einops.layers.torch import Rearrange, Reduce
from einops import rearrange, reduce, repeat
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=1, patch_size_w=50, patch_size_h=56, emb_size=50 * 56, img_size=50 * 56):
        self.patch_size_w = patch_size_w
        self.patch_size_h = patch_size_h
        super().__init__()
        self.projection = nn.Sequential(
            nn.Conv2d(in_channels, emb_size, kernel_size=(patch_size_w, patch_size_h),
                      stride=(patch_size_w, patch_size_h)),
            Rearrange('b e (h) (w) -> b (h w) e'),
        )
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size))
        self.position = nn.Parameter(torch.randn((img_size // (patch_size_w * patch_size_h)) + 1, emb_size))

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        b, _, _, _ = x.shape
        x = self.projection(x)
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b)
        x = torch.cat([cls_tokens, x], dim=1)
        x += self.position
        return x
    
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size=900, num_heads=5, dropout=0.0):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.qkv = nn.Linear(emb_size, emb_size * 3)
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)

    def forward(self, x, mask=None):
        qkv = rearrange(self.qkv(x), "b n (h d qkv) -> (qkv) b h n d", h=self.num_heads, qkv=3)
        queries, keys, values = qkv[0], qkv[1], qkv[2]
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys)
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)

        scaling = self.emb_size ** (1 / 2)
        att = F.softmax(energy, dim=-1) / scaling
        att = self.att_drop(att)
        # sum up over the third axis
        out = torch.einsum('bhal, bhlv -> bhav ', att, values)
        out = rearrange(out, "b h n d -> b n (h d)")
        out = self.projection(out)
        return out


class ResidualAdd(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        res = x
        x = self.fn(x, **kwargs)
        x += res
        return x


class FeedForwardBlock(nn.Sequential):
    def __init__(self, emb_size, expansion=4, drop_p=0.):
        super().__init__(
            nn.Linear(emb_size, expansion * emb_size),
            nn.GELU(),
            nn.Dropout(drop_p),
            nn.Linear(expansion * emb_size, emb_size),
        )


class TransformerEncoderBlock(nn.Sequential):
    def __init__(self,
                 emb_size=900,
                 drop_p=0.,
                 forward_expansion=4,
                 forward_drop_p=0.,
                 **kwargs):
        super().__init__(
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                MultiHeadAttention(emb_size, **kwargs),
                nn.Dropout(drop_p)
            )),
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                FeedForwardBlock(
                    emb_size, expansion=forward_expansion, drop_p=forward_drop_p),
                nn.Dropout(drop_p)
            )
            ))


class TransformerEncoder(nn.Sequential):
    def __init__(self, depth=1, **kwargs):
        super().__init__(*[TransformerEncoderBlock(**kwargs) for _ in range(depth)])


class ClassificationHead(nn.Sequential):
    def __init__(self, emb_size=900, n_classes=7):
        super().__init__(
            Reduce('b n e -> b e', reduction='mean'),
            nn.LayerNorm(emb_size),
            nn.Linear(emb_size, n_classes))

class UT_HAR_ViT(nn.Sequential):
    def __init__(self,
                 in_channels=1,
                 patch_size_w=50,
                 patch_size_h=56,
                 emb_size=50 * 56,
                 img_size=50 * 56,
                 depth=1,
                 n_classes=7,
                 **kwargs):
        super().__init__(
            PatchEmbedding(in_channels, patch_size_w, patch_size_h, emb_size, img_size),
            TransformerEncoder(depth, emb_size=emb_size, **kwargs),
            ClassificationHead(emb_size, n_classes)
        )


In [78]:
model3 = UT_HAR_ViT(patch_size_w=50, patch_size_h=56, emb_size=50 * 56, img_size=50 * 56).to(device)

In [80]:
for epoch in range(num_epochs):
    model3.train()
    running_loss = 0.0

    for back_idx, (data, label) in enumerate(train_dataloader):
        # print(f'data loader data shape: {data.shape}')
        #zero grad the optimizer
        optimizer.zero_grad()
        #take the output
        # data = data.unsqueeze(1)
        output = model3(data)
        # output = model2(data)
        #find the loss
        loss = criterion(output, label)
        #back propagation
        loss.backward()
        #optimizer step
        optimizer.step()

        running_loss += loss.item()
    avg_train_loss = running_loss/len(train_dataloader)
    training_losses.append(avg_train_loss)
    # break


    model3.eval()
    
    with torch.no_grad():
        val_loss = 0.0
        correct = 0
        total = 0
        for data, target in test_dataloader:
            data, target = data.to(device), target.to(device)
            # data = data.unsqueeze(1)
            output = model3(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            val_loss += criterion(output, target.long()).item()
        avg_val_loss = val_loss / len(test_dataloader)
        test_losses.append(avg_val_loss)
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Accuracy: {100*correct/total:.2f}%')

Epoch 1/50, Train Loss: 2.1193, Val Loss: 2.1800, Accuracy: 18.54%


KeyboardInterrupt: 