In [1]:
import unittest
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.autograd import Variable
import random

### Data

In [2]:
df1 = pd.read_csv('torque_baseline.csv')
df2 = pd.read_csv('torque_perturb.csv')

In [3]:
# min_max normalization
n_df1 = (df1-df1.min())/(df1.max()-df1.min())
n_df2 = (df2-df2.min())/(df2.max()-df2.min())

In [4]:
def shuffle_data(n_df1, n_df2):
    labels = []
    l = len(n_df1) + len(n_df2)
    concat_df = n_df1.append(n_df2, ignore_index=True)
    shuffled_df = pd.DataFrame()
    random_list  = list(range(l))
    random.shuffle(random_list)
    for num in random_list:
        chunk_size = random.randint(5,21)
        if num + chunk_size < l:
            if num < len(n_df1):
                labels.extend([0]*chunk_size)
            else:
                labels.extend([1]*chunk_size)
            shuffled_df = shuffled_df.append(concat_df.loc[num:num+chunk_size-1], ignore_index=True)
    return shuffled_df, labels

In [5]:
shuffled_df, labels = shuffle_data(n_df1, n_df2)

  concat_df = n_df1.append(n_df2, ignore_index=True)
  shuffled_df = shuffled_df.append(concat_df.loc[num:num+chunk_size-1], ignore_index=True)


In [6]:
X_train = shuffled_df.loc[0:int(len(shuffled_df)*0.9)-1].to_numpy()
X_test = shuffled_df.loc[int(len(shuffled_df)*0.9):].to_numpy()

In [7]:
y_train = np.array(labels[0:int(len(shuffled_df)*0.9)])
y_test = np.array(labels[int(len(shuffled_df)*0.9):])

In [8]:
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))

y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test)) 

# y_train_tensors = Variable(torch.Tensor(y_train).unsqueeze(1))
# y_test_tensors = Variable(torch.Tensor(y_test).unsqueeze(1)) 

In [9]:
# X_train_tensors = torch.reshape(X_train_tensors, (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))
# X_test_tensors = torch.reshape(X_test_tensors, (X_test_tensors.shape[0], 1, X_test_tensors.shape[1]))

### Model

In [10]:
class LSTM1(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length):
        super(LSTM1, self).__init__()
        self.num_classes = num_classes #number of classes
        self.num_layers = num_layers #number of layers
        self.input_size = input_size #input size
        self.hidden_size = hidden_size #hidden state
        self.seq_length = seq_length #sequence length

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                          num_layers=num_layers, batch_first=True) #lstm
        self.fc_1 =  nn.Linear(hidden_size, 128) #fully connected 1
        self.fc = nn.Linear(128, num_classes) #fully connected last layer

        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self,x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)) #hidden state
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)) #internal state
        # Propagate input through LSTM
        output, (hn, cn) = self.lstm(x, (h_0, c_0)) #lstm with input, hidden, and internal state
        hn = hn.view(-1, self.hidden_size) #reshaping the data for Dense layer next
        out = self.relu(hn)
        out = self.fc_1(out) #first Dense
        out = self.relu(out) #relu
        out = self.fc(out) #Final Output
        out = self.sigmoid(out)
        return out

### Training

In [11]:
num_epochs = 30 #1000 epochs
learning_rate = 1e-4 #0.001 lr

input_size = 12 #number of features
hidden_size = 2 #number of features in hidden state
num_layers = 1 #number of stacked lstm layers

num_classes = 1 #number of output classes 

lstm = LSTM1(num_classes, input_size, hidden_size, num_layers, X_train_tensors.shape[1])

In [12]:
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr=learning_rate, weight_decay=1e-5)

In [13]:
for epoch in range(num_epochs):

    for i in range(len(X_test_tensors)):
        input = torch.reshape(X_train_tensors[i], (1, 1, X_train_tensors[i].shape[0]))
        outputs = lstm.forward(input).flatten() #forward pass
        optimizer.zero_grad() #caluclate the gradient, manually setting to 0
        target = y_train_tensors[i].unsqueeze(0)
        # obtain the loss function
        loss = criterion(outputs, target)
        
        loss.backward() #calculates the loss of the loss function
        
        optimizer.step() #improve from loss, i.e backprop

    if epoch % 1 == 0:
        print("Epoch: %d, loss: %1.5f" % (epoch, loss.item())) 

Epoch: 0, loss: 0.62999
Epoch: 1, loss: 0.62957
Epoch: 2, loss: 0.62911
Epoch: 3, loss: 0.63380
Epoch: 4, loss: 0.64372
Epoch: 5, loss: 0.66292
Epoch: 6, loss: 0.69143
Epoch: 7, loss: 0.71793
Epoch: 8, loss: 0.74193
Epoch: 9, loss: 0.75084
Epoch: 10, loss: 0.74541
Epoch: 11, loss: 0.72916
Epoch: 12, loss: 0.70890
Epoch: 13, loss: 0.68781
Epoch: 14, loss: 0.66461
Epoch: 15, loss: 0.64165
Epoch: 16, loss: 0.62040
Epoch: 17, loss: 0.60802
Epoch: 18, loss: 0.58782
Epoch: 19, loss: 0.57345
Epoch: 20, loss: 0.56507
Epoch: 21, loss: 0.55277
Epoch: 22, loss: 0.54180
Epoch: 23, loss: 0.52574
Epoch: 24, loss: 0.51140
Epoch: 25, loss: 0.49658
Epoch: 26, loss: 0.48043
Epoch: 27, loss: 0.46087
Epoch: 28, loss: 0.44089
Epoch: 29, loss: 0.41332


### Test

In [14]:
lstm.eval()

LSTM1(
  (lstm): LSTM(12, 2, batch_first=True)
  (fc_1): Linear(in_features=2, out_features=128, bias=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (relu): ReLU()
  (sigmoid): Sigmoid()
)

In [51]:
correct = 0
threshold = torch.tensor([0.5])
for i in range(len(X_test_tensors)):
    input = torch.reshape(X_test_tensors[i], (1, 1, X_test_tensors[i].shape[0]))
    output = lstm(input)
    result = ((output>threshold).float()*1).flatten()
    target = y_test_tensors[i].unsqueeze(0)
    if torch.equal(result, target):
        correct += 1

In [52]:
correct/len(X_test_tensors)

0.8308776425368354

### Unit testing

In [None]:
class TestStringMethods(unittest.TestCase):

    def setUp(self):
        self.df1 = pd.read_csv('torque_baseline.csv')
        self.df2 = pd.read_csv('torque_perturb.csv')
        self.n_df1 = (self.df1-self.df1.min())/(self.df1.max()-self.df1.min())
        self.n_df2 = (self.df2-self.df2.min())/(self.df2.max()-self.df2.min())
        self.np_ndf1 = self.n_df1.to_numpy()
        
    def test_min_max(self):
        zeros = []
        for i in range(len(self.n_df1.columns)):
            zeros.append(0)        
        ones = []
        for i in range(len(self.n_df1.columns)):
            ones.append(0)
        
        for val in self.n_df1.max():
            assert val == 1.0
        
        for val in self.n_df1.min():
            assert val == 0.0

        for val in self.n_df2.max():
            assert val == 1.0

        for val in self.n_df2.min():
            assert val == 0.0
    
    def test_randlist(self):
        random_list = list(range(len(self.df2)))
        random.shuffle(random_list)
        assert len(random_list) == len(self.df2)
    
    def test_concat(self):
        l = len(self.df2) + len(self.df1)
        concat_df = self.n_df1.append(self.n_df2, ignore_index=True)
        assert len(concat_df) == l
    
    def test_shuffler(self):
        label = []
        l = len(self.df2) + len(self.df1)
        concat_df = self.n_df1.append(self.n_df2, ignore_index=True)
        shuffled_df = pd.DataFrame()
        random_list  = list(range(l))
        random.shuffle(random_list)
        for num in random_list:
            chunk_size = random.randint(5,21)
            if num + chunk_size < l:
                if num < len(self.df1):
                    label.extend([0]*chunk_size)
                else:
                    label.extend([1]*chunk_size)
                shuffled_df = shuffled_df.append(concat_df.loc[num:num+chunk_size-1], ignore_index=True)
        assert len(shuffled_df) > l
        assert len(label) == len(shuffled_df)
    
    def test_TrainTest(self):
        shuffled_df, labels = shuffle_data(n_df1, n_df2)
        X_train = shuffled_df.loc[0:int(len(shuffled_df)*0.9)-1]
        X_test = shuffled_df.loc[int(len(shuffled_df)*0.9):]
        y_train = labels[0:int(len(shuffled_df)*0.9)]
        y_test = labels[int(len(shuffled_df)*0.9):]
        assert len(X_train) == len(y_train)
        assert len(X_test) == len(y_test)

In [None]:
if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)

test_TrainTest (__main__.TestStringMethods) ... ok
test_concat (__main__.TestStringMethods) ... ok
test_min_max (__main__.TestStringMethods) ... ok
test_randlist (__main__.TestStringMethods) ... ok
test_shuffler (__main__.TestStringMethods) ... ok

----------------------------------------------------------------------
Ran 5 tests in 2.373s

OK
