In [None]:
import torch
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as O
import torch.nn.functional as F
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from scipy.signal import butter, lfilter, freqz
from copy import deepcopy

device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device.upper()}")

%matplotlib inline

In [None]:
rawData = pd.read_csv("dataset.csv")
rawData=rawData.drop(columns="Time in s")
rawData["Angle"] = rawData["Angle"]*2
whMapping = {"Person A":[50, 145], "Person G":[58, 168], "Person F":[78.6, 159], "Person H":[58, 168], "Person E": [75, 158], "Person B": [86, 166], "Person C": [65, 174], "Person I":[56, 160], "Person J":[65, 161], "Person D": [70, 161]}
print(whMapping)

In [None]:
rawData

In [None]:

# Action coding:
# 0 is idle state
# 1 is flexion
# -1 is extension

def butter_lowpass(cutoff, fs, order=5):
    return butter(order, cutoff, fs=fs, btype='low', analog=False)

def butter_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y

order=1
fs=1
cutoff=0.1
sections = [[5,45, 1], [50,89, 0], [100,139, 1], [145,184, 0], [195,234, 1], [240,279, 0]]
maxAngle=np.max(rawData["Angle"])

trainData = {"weight":[], "height":[], "current":[]
             ,"action":[]
             , "angle":[]
            }

testData = {"weight":[], "height":[], "current":[]
             ,"action":[]
             , "angle":[]
            }
def get_cmap(n, name='hsv'):
    '''Returns a function that maps each index in 0, 1, ..., n-1 to a distinct 
    RGB color; the keyword argument name must be a standard mpl colormap name.'''
    return plt.get_cmap(name, n)

cmap = get_cmap(9)
for idx, i in enumerate(list(["Person A","Person B","Person G","Person D","Person E","Person F","Person H","Person I","Person J"])):
    iList = []
    actList = []
    angList = []
    for sec in sections:
        sig = butter_lowpass_filter(rawData[i][sec[0]:sec[1]], cutoff, fs, order)[5:]
        plt.plot(sig, color=cmap(idx))
        iList.extend(sig)
        actList.extend([sec[2]]*len(sig))
        angList.extend(rawData["Angle"][sec[1]-len(sig):sec[1]])
    ln = len(iList)
    trainData["current"].extend(list(iList))
    trainData["height"].extend([whMapping[i][1]]*ln)
    trainData["weight"].extend([whMapping[i][0]]*ln)
    trainData["action"].extend(list(actList))
    trainData["angle"].extend(list(angList))

for i in list(["Person C"]):
    iList = []
    actList = []
    angList = []
    for sec in sections:
        sig = butter_lowpass_filter(rawData[i][sec[0]:sec[1]], cutoff, fs, order)[5:]
        iList.extend(sig)
        actList.extend([sec[2]]*len(sig))
        angList.extend(rawData["Angle"][sec[1]-len(sig):sec[1]])
    ln = len(iList)
    testData["current"].extend(list(iList))
    testData["height"].extend([whMapping[i][1]]*ln)
    testData["weight"].extend([whMapping[i][0]]*ln)
    testData["action"].extend(list(actList))
    testData["angle"].extend(list(angList))



currentScaler = MinMaxScaler()
a=np.array([*trainData["current"], *testData["current"]]).reshape((-1,1))
print(a.shape)
currentScaler.fit(a)
trainData["current"] = currentScaler.transform(np.array(trainData["current"]).reshape((-1,1))).flatten()
testData["current"] = currentScaler.transform(np.array(testData["current"]).reshape((-1,1))).flatten()
# print(trainData)
# print(testData)
for i in trainData.keys():
    print(i, len(trainData[i]))
print()
for i in testData.keys():
    print(i, len(testData[i]))

In [None]:
pd.DataFrame.from_dict(trainData)
plt.plot(trainData["current"])

In [None]:
sns.heatmap(pd.DataFrame.from_dict(trainData).corr())

In [None]:
class AngleDataset(Dataset):
    def __init__(self, dataMap, device="cpu"):
        # super().__init__()
        self.x=torch.tensor([dataMap["current"],dataMap["height"],dataMap["weight"], dataMap["action"]])
        self.y=torch.tensor(dataMap["angle"])
        self.x = self.x.T
        assert self.x.shape[0] == len(self.y), f"X shape: {len(self.x)}, Y shape: {len(self.y)}"
        self.x=self.x.to(device)
        self.x = self.x.to(torch.float)
        self.y = self.y.to(device).to(torch.float)
        
    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [None]:
trainDataset=AngleDataset(trainData, device)
trainDL = DataLoader(trainDataset, shuffle=True, batch_size=16)
print(len(trainDataset))

testDataset = AngleDataset(testData, device)
testDL = DataLoader(testDataset, shuffle=False, batch_size=1)
print(len(testDataset))

In [None]:
class Model(nn.Module):
    def __init__(self, inputSize, hidden, outputSize):
        super().__init__()
        self.l1=nn.Linear(inputSize, hidden, bias=True)
        self.l2=nn.Linear(hidden, hidden, bias=True)
        self.l4=nn.Linear(hidden, outputSize, bias=True)

        self.a1 = nn.ReLU()
        self.a2 = nn.ReLU()

    def forward(self, x):
        o1 = self.a1(self.l1(x))
        o2 = self.a2(self.l2(o1))
        return self.l4(o2)

In [None]:
lss = []
testLss = []

In [None]:
epochs = 300
lr=0.001

In [None]:
testModel = Model(4, 192, 1).to(device)
opt = O.Adam(testModel.parameters(), lr=lr)
sch = O.lr_scheduler.StepLR(opt, step_size=10, gamma=0.5)
lossFn = torch.nn.MSELoss()

In [None]:
minTestError = 100
minTrainError = 100
epoch = 0
bestModelParams = None
for e in tqdm(range(epochs)):
    epochLoss=0
    testModel.train()
    for x, y in trainDL:
        opt.zero_grad()
        yHat = testModel(x).flatten()
        l = lossFn(yHat, y)
        l.backward()
        opt.step()
        epochLoss += l.detach().cpu().item()
    lss.append(epochLoss/len(trainDL.dataset))
    testModel.eval()
    testError = 0
    for xTest, yTest in testDL:
        yTestHat = testModel(xTest).flatten()
        testError += torch.abs(yTestHat - yTest).sum()
    testError = testError.detach().cpu().item()/len(testDataset)
    testLss.append(testError)
    if testError < minTestError:
        minTestError = testError
        minTrainError = lss[-1]
        epoch=e
        bestModelParams = deepcopy(testModel)

In [None]:
plt.plot(lss[0:], label="Train Loss")
plt.plot(testLss[0:], label="Test Error")
plt.legend()
plt.show()

print(f"Best epoch at {epoch}")
print(f"Least Train Error: {minTrainError:.3f}")
print(f"Least Test Error: {minTestError:.3f}")
print()

print(f"Train Error %: {minTrainError*100/maxAngle:.3f}")
print(f"Test  Error %: {minTestError*100/maxAngle:.3f}")


In [None]:
testError = 0
predData = []
properData = []
inpData = []
for xTest, yTest in testDL:
    yTestHat = bestModelParams(xTest).flatten()
    predData.append(yTestHat.detach().cpu().item())
    properData.append(yTest.detach().cpu().item())
    inpData.append(xTest[0][0].detach().cpu().item())
    testError += torch.abs(yTestHat - yTest).sum()
testError = testError / len(testDataset)
print(f"Average Drift: {testError}")
print(f"Error: {testError*100/maxAngle}")

In [None]:
paperData = [10860, 10870, 10880, 10938, 10852, 13944, 13628, 13940, 13998, 14278, 13880, 13524, 15342, 15240, 15480, 14486, 14654, 16584, 16066, 15286, 15726, 15968, 15562, 16102, 16018, 15988, 18776, 17688, 16440, 16128, 16722, 17270, 19036, 18220, 17224, 17822, 18668, 20788, 19418, 21644, 22008, 19586, 21194, 23806, 22608, 11440, 11508, 11394, 11334, 11354, 18718, 17524, 17614, 17804, 16712, 16344, 16154, 15166, 17034, 19100, 15414, 14716, 14844, 14068, 12858, 12702, 15384, 15258, 15106, 14306, 13972, 13510, 12632, 12422, 14316, 14162, 14758, 13798, 12656, 13364, 14536, 14162, 13206, 12180, 13744, 13658, 13098, 12268, 13662, 13710, 11000, 10988, 10912, 10918, 10918, 10896, 10896, 10888, 10916, 10880]
weight = 65
height = 174
zeroAngleRange = range(0,5)
ones = range(5, 43)
topAngleRange = range(43, 52)
zeros = range(52, 90)
paperData = currentScaler.transform(np.array(paperData).reshape((-1, 1)))
finalAngles=[]
bestModelParams = bestModelParams.cpu()
bestModelParams.eval()
for i in range(len(paperData)):
    if i in zeroAngleRange:
        finalAngles.append(0)
    elif i in ones:
        pred = bestModelParams(torch.tensor(np.array([paperData[i][0], height, weight, 1])).float())
        finalAngles.append(pred.item())
    elif i in topAngleRange:
        finalAngles.append(48.9)
    elif i in zeros:
        pred = bestModelParams(torch.tensor(np.array([paperData[i][0], height, weight, 0])).float())
        finalAngles.append(pred.item())
    else:
        finalAngles.append(0)
finalAngles = butter_lowpass_filter(finalAngles, fs=2, order=1, cutoff=0.1)
plt.plot(finalAngles)
print(finalAngles.tolist())

In [None]:
plt.plot(predData)
plt.plot(properData)


## K Fold Validation

In [None]:
results = []

for person in whMapping:
    print("*"*50)
    print(f"Test Subject: {person}")
    personList = list(whMapping.keys())
    personList.remove(person)
        
    trainData = {"weight":[], "height":[], "current":[]
                 ,"action":[]
                 , "angle":[]
                }
    
    testData = {"weight":[], "height":[], "current":[]
                 ,"action":[]
                 , "angle":[]
                }
    
    for i in list(personList):
        iList = []
        actList = []
        angList = []
        for sec in sections:
            sig = butter_lowpass_filter(rawData[i][sec[0]:sec[1]], cutoff, fs, order)[5:]
            # plt.plot(sig)
            iList.extend(sig)
            actList.extend([sec[2]]*len(sig))
            angList.extend(rawData["Angle"][sec[1]-len(sig):sec[1]])
        ln = len(iList)
        trainData["current"].extend(list(iList))
        trainData["height"].extend([whMapping[i][1]]*ln)
        trainData["weight"].extend([whMapping[i][0]]*ln)
        trainData["action"].extend(list(actList))
        trainData["angle"].extend(list(angList))
    
    for i in list([person]):
        iList = []
        actList = []
        angList = []
        for sec in sections:
            sig = butter_lowpass_filter(rawData[i][sec[0]:sec[1]], cutoff, fs, order)[5:]
            iList.extend(sig)
            actList.extend([sec[2]]*len(sig))
            angList.extend(rawData["Angle"][sec[1]-len(sig):sec[1]])
        ln = len(iList)
        testData["current"].extend(list(iList))
        testData["height"].extend([whMapping[i][1]]*ln)
        testData["weight"].extend([whMapping[i][0]]*ln)
        testData["action"].extend(list(actList))
        testData["angle"].extend(list(angList))
    
    
    
    currentScaler = MinMaxScaler()
    a=np.array([*trainData["current"], *testData["current"]]).reshape((-1,1))
    currentScaler.fit(a)
    trainData["current"] = currentScaler.transform(np.array(trainData["current"]).reshape((-1,1))).flatten()
    testData["current"] = currentScaler.transform(np.array(testData["current"]).reshape((-1,1))).flatten()
    assert len(trainData["current"]) == len(trainData["height"]) == len(trainData["weight"]) == len(trainData["action"]) == len(trainData["angle"]), "train data not homogeneous"
    assert len(testData["current"]) == len(testData["height"]) == len(testData["weight"]) == len(testData["action"]) == len(testData["angle"]), "test data not homogeneous"

    print(f"Train Len: {len(trainData["current"])}, Test Len: {len(testData["current"])}")


    trainDataset=AngleDataset(trainData, device)
    trainDL = DataLoader(trainDataset, shuffle=True, batch_size=16)
    
    testDataset = AngleDataset(testData, device)
    testDL = DataLoader(testDataset, shuffle=True, batch_size=16)

    lss = []
    testLss = []
    testModel = Model(4, 192, 1).to(device)
    opt = O.Adam(testModel.parameters(), lr=lr)
    sch = O.lr_scheduler.StepLR(opt, step_size=10, gamma=0.5)
    lossFn = torch.nn.MSELoss()


    minTestError = 100
    minTrainError = 100
    epoch = 0
    bestModelParams = None
    for e in tqdm(range(epochs)):
        epochLoss=0
        testModel.train()
        for x, y in trainDL:
            opt.zero_grad()
            yHat = testModel(x).flatten()
            l = lossFn(yHat, y)
            l.backward()
            opt.step()
            epochLoss += l.detach().cpu().item()
        lss.append(epochLoss/len(trainDL.dataset))
        testModel.eval()
        testError = 0
        for xTest, yTest in testDL:
            yTestHat = testModel(xTest).flatten()
            testError += torch.abs(yTestHat - yTest).sum()
        testError = testError.detach().cpu().item()/len(testDataset)
        testLss.append(testError)
        if testError < minTestError:
            minTestError = testError
            minTrainError = lss[-1]
            epoch=e
            bestModelParams = deepcopy(testModel)


    plt.plot(lss[0:], label="Train Loss")
    plt.plot(testLss[0:], label="Test Error")
    plt.title(f"Test: {person}")
    plt.legend()
    plt.show()
    
    print(f"Best epoch at {epoch}")
    print(f"Least Train Error: {minTrainError:.3f}")
    print(f"Least Test Error: {minTestError:.3f}")
    print()
    
    print(f"Train Error %: {minTrainError*100/maxAngle:.3f}")
    print(f"Test  Error %: {minTestError*100/maxAngle:.3f}")

    testError = 0
    bestModelParams.eval()
    for xTest, yTest in testDL:
        yTestHat = bestModelParams(xTest).flatten()
        testError += torch.abs(yTestHat - yTest).sum()
    testError = testError / len(testDataset)
    print(f"Average Drift: {testError}")
    print(f"Error: {testError*100/maxAngle}")

    results.append([person, epoch, minTrainError, minTrainError*100/maxAngle, (testError).item(), (testError*100/maxAngle).item()])

In [None]:
rsPD  = pd.DataFrame(results)
rsPD.columns = columns=["Person", "BestEpoch", "trainError", "TrainError%", "testError", "testError%"]
print(rsPD)