In [27]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np


In [28]:
# import data.tsv file and create a dataframe

df = pd.read_csv('data.tsv', sep='\t')

# open file data_info.txt and read the first line

sequence_length = None
with open('data_info.txt', 'r') as f:
    line = f.readline()
    split_line_on_collen = line.split(':')
    sequence_length = int(split_line_on_collen[1])
    


train_df = df.sample(frac=0.8, random_state=0)
test_df = df.drop(train_df.index)


# drop the collumn Y from the dataframe

train_x = train_df.drop(['Y'], axis=1).iloc[1:]
train_y = train_df['Y'].iloc[1:]

# make the tran_x data rows have the same length as the sequence_length by padding with [0,0,0] for each collumn

# train_x = train_x.reindex(range(sequence_length), fill_value=[0,0,0])

test_x = test_df.drop(['Y'], axis=1)


test_y = test_df['Y']



In [29]:
def convert_rows_to_nupy_array(df):
    numpy_array = []
    for index, row in df.iterrows():
        row_array = []
        for col in row.iteritems():
            col_float = []
            for item in col[1].split(','):
                col_float.append(float(item.replace('[', '').replace(']', '')))
            row_array.append(col_float)
        numpy_array.append(row_array)
    return np.array(numpy_array)

train_x_numpy = convert_rows_to_nupy_array(train_x)
test_x_numpy = convert_rows_to_nupy_array(test_x)

print(train_x_numpy.shape)
print(test_x_numpy[1][5])
            

(2057, 749, 3)
[ 0.51171875 -0.10406494 -0.10406494]


##### Remove Padding from numpy

In [30]:
train_y_no_pad = train_y
test_y_no_pad = test_y

# remove the numpy array at the axis 3 that has values of a numpy array with all zeros

train_x_numpy_no_pad = np.delete(train_x_numpy, np.where(~train_x_numpy.any(axis=(0,1,2))), axis=0)
test_x_numpy_no_pad = np.delete(test_x_numpy, np.where(~test_x_numpy.any(axis=(0,1,2))), axis=0)
print(train_x_numpy_no_pad.shape)

(2057, 749, 3)


## Train

In [31]:
# Dummy Classifer

from sklearn.dummy import DummyClassifier
from sklearn.metrics import accuracy_score

dummy_clf = DummyClassifier(strategy="most_frequent")
print(train_x_numpy.shape, train_y.shape)
dummy_clf.fit(train_x_numpy, train_y)
y_pred = dummy_clf.predict(test_x_numpy)
print(accuracy_score(test_y, y_pred))

(2057, 749, 3) (2057,)
0.7704280155642024


In [34]:

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, input,target, seq_len):
        self.input = input
        self.target = target
        self.seq_len = seq_len
    def __getitem__(self, item):
        # print("input item", self.input[item])
        return self.input[item], self.target.iloc[item]

        # return input[item:item+self.seq_len], input[item+self.seq_len]
    def __len__(self):
        return self.input.shape[0]

# test_input = np.arange(1,8).reshape(-1,1)
# input = torch.tensor(test_input, dtype=torch.float)

# dataset = MyDataset(input, 3)

# dl = torch.utils.data.DataLoader(dataset, batch_size=1)
# for inp, label in dl:
#     print("inp shape",inp.shape)
#     print("inp",inp.numpy())


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


input_size = 3
num_classes = 2
hidden_size = 256
num_layers = 2
learning_rate = 0.001

class SitUpDetector(nn.Module):
    def __init__(self,input_size, num_classes, hidden_size, num_layers):
        super(SitUpDetector, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out
    

model = SitUpDetector(input_size, num_classes, hidden_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


myDataset = MyDataset(train_x_numpy,train_y, sequence_length)
train_loader = torch.utils.data.DataLoader(dataset=myDataset, batch_size=1)


num_epochs = 100
total_steps = len(train_loader)
for epoch in range(num_epochs):
    for i, (features, target) in enumerate(train_loader):

        # convert Double tensor to Float tensor
        features = features.float()

        outputs = model(features)
        
        loss = criterion(outputs, target)
        optimizer.zero_grad()
        
        loss.backward()

        optimizer.step()
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_steps, loss.item()))

# Check accuracy
test_dataset = MyDataset(test_x_numpy, test_y, sequence_length)
test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=1)
n_correct = 0
n_samples = 0
model.eval()

with torch.no_grad():
    for x, y in test_dataloader:
        x = x.to(device=device).squeeze(1)
        y = y.to(device=device)

        x = x.float()

        scores = model(x)

        _, predictions = scores.max(1)

        n_correct += (predictions == y).sum()
        n_samples += predictions.size(0)

    print(f'Got {n_correct} / {n_samples} with accuracy {float(n_correct)/float(n_samples)*100:.2f}')

# save the model

torch.save(model.state_dict(), 'sit_up_detector.pth')






Epoch [1/1], Step [100/2057], Loss: 2.0248
Epoch [1/1], Step [200/2057], Loss: 1.3374
Epoch [1/1], Step [300/2057], Loss: 0.1863
Epoch [1/1], Step [400/2057], Loss: 0.3374
Epoch [1/1], Step [500/2057], Loss: 1.4976
Epoch [1/1], Step [600/2057], Loss: 0.2362
Epoch [1/1], Step [700/2057], Loss: 1.4465
Epoch [1/1], Step [800/2057], Loss: 1.2325
Epoch [1/1], Step [900/2057], Loss: 1.3849
Epoch [1/1], Step [1000/2057], Loss: 1.3731
Epoch [1/1], Step [1100/2057], Loss: 1.3837
Epoch [1/1], Step [1200/2057], Loss: 0.3613
Epoch [1/1], Step [1300/2057], Loss: 0.3642
Epoch [1/1], Step [1400/2057], Loss: 1.2773
Epoch [1/1], Step [1500/2057], Loss: 0.3934
Epoch [1/1], Step [1600/2057], Loss: 1.5072
Epoch [1/1], Step [1700/2057], Loss: 0.3032
Epoch [1/1], Step [1800/2057], Loss: 1.4454
Epoch [1/1], Step [1900/2057], Loss: 1.1267
Epoch [1/1], Step [2000/2057], Loss: 0.3620
Got 396 / 514 with accuracy 77.04


# Train with no Padding

In [None]:
# Make a new Pytorch LSTM model that takes a variable input

class SitUpDetectorVariable(nn.Module):
    def __init__(self,input_size, num_classes, hidden_size, num_layers):
        super(SitUpDetectorVariable, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

# train the model

model = SitUpDetectorVariable(input_size, num_classes, hidden_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

myDataset = MyDataset(train_x_numpy_no_pad,train_y, sequence_length)
train_loader = torch.utils.data.DataLoader(dataset=myDataset, batch_size=1)

num_epochs = 5
total_steps = len(train_loader)

for epoch in range(num_epochs):
    for i, (features, target) in enumerate(train_loader):

        # convert Double tensor to Float tensor
        features = features.float()

        outputs = model(features)

        loss = criterion(outputs, target)
        optimizer.zero_grad()
        
        loss.backward()

        optimizer.step()
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_steps, loss.item()))
                   