In [1]:
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split, KFold

import brevitas.nn as nn

from config import *
from classic_models import *
from data_preprocessing import *
from feature_extraction import *
from helpers import *

from torch.utils.data import DataLoader

Dance move: 0, Name: dummy_dance_1
Dance move: 1, Name: dummy_dance_2
Dance move: 2, Name: dummy_dance_3
Dance move: 3, Name: dummy_dance_4
Dance move: 4, Name: dummy_dance_5
Dance move: 5, Name: dummy_dance_6
Dance move: 6, Name: dummy_dance_7
Dance move: 7, Name: dummy_dance_8
Dance move: 8, Name: move_left
Dance move: 9, Name: move_right


In [2]:
df = pd.read_csv('out.csv')
df.head()

Unnamed: 0,mean_x,mean_y,mean_z,max_x,max_y,max_z,min_x,min_y,min_z,median_x,...,iqr_x,iqr_y,iqr_z,zero_crossing_counts_x,zero_crossing_counts_y,zero_crossing_counts_z,dominant_frequency_x,dominant_frequency_y,dominant_frequency_z,tag
0,12.3208,19.61,-4.04,14.47,-0.1728,7.35601,13.0675,-0.555269,8.0,0.0,...,19.61,-19.61,-4.205,0.2321,11.014407,13.965,0.362901,28.0,0.28,1.0
1,12.409,19.61,-1.76,13.725,0.1785,6.944948,12.7025,-0.511257,6.0,0.0,...,19.61,-19.61,-4.205,-0.1642,10.65234,14.3025,0.273621,28.0,0.28,1.0
2,12.017,19.61,-1.76,13.065,-0.3281,6.670013,11.315,-0.482849,6.0,0.0,...,19.61,-19.61,-3.73,0.4712,10.484271,14.0375,0.245018,27.0,0.28,1.0
3,11.1898,19.61,-1.76,12.985,-0.1299,7.00291,14.2475,-0.335542,6.0,0.0,...,19.61,-19.61,-3.68,0.369,11.250628,14.52,0.251294,28.0,0.28,1.0
4,10.9784,19.61,-1.76,12.555,0.6584,6.741029,12.97,-0.347759,5.0,0.0,...,19.61,-19.61,-3.73,-0.7864,11.196545,14.52,0.365375,27.0,0.28,1.0


In [3]:
class FeatureDataset(torch.utils.data.Dataset):
    def __init__(self):
        df = pd.read_csv('out.csv')
        df['tag'] = df['tag'].apply(lambda x: x-1)
        if isinstance(df, pd.DataFrame):
            df = df.to_numpy()

        self.X = df[:,:-1]
        self.Y = df[:,-1]

    def __getitem__(self, idx):
        # get item by index
        return self.X[idx], self.Y[idx]

    def __len__(self):
        # returns length of data
        return len(self.X)

In [4]:
dataset = FeatureDataset()

train_size = int(len(dataset) * 0.75)
test_size = len(dataset) - train_size
train_data, test_data = torch.utils.data.random_split(dataset, (train_size, test_size))
print(len(train_data), len(test_data))

1345 449


In [5]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size = 64, shuffle = True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size = 64, shuffle = False)

In [6]:
train_loader

<torch.utils.data.dataloader.DataLoader at 0x29d6bfe2548>

In [7]:
class SimpleMLP(torch.nn.Module):
    def __init__(self, input_size, output_size):
        super(SimpleMLP, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        
        self.input_fc = nn.QuantLinear(input_size, 256, bias=True, weight_bit_width=4)
        self.hidden_fc = nn.QuantLinear(256, 256, bias=True, weight_bit_width=4)
        self.hidden_fc_2 = nn.QuantLinear(256, 128, bias=True, weight_bit_width=4)
        self.output_fc = nn.QuantLinear(128, output_size, bias=False, weight_bit_width=4)
        
        self.relu = nn.QuantReLU(bit_width=2, max_val=4)

    def forward(self, x):
        x_1 = self.input_fc(x)
        h_1 = torch.nn.functional.relu(x_1)
        x_2 = self.hidden_fc(h_1)
        x_3 = self.hidden_fc_2(x_2)
        h_2 = torch.nn.functional.relu(x_3)
        y_pred = self.output_fc(h_2)
        
        return y_pred

In [8]:
model = SimpleMLP(df.shape[1]-1, len(dances))
criterion = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr = 0.001)
learning_rate = 3e-5
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

print(df.shape[1]-1, len(dances))

30 10


In [9]:
n_epochs = 200

for epoch in range(n_epochs):
    for i, (x, y) in enumerate(train_loader):
        x, y = x.float(), y.long()
        outputs = model(x)

        loss = criterion(outputs, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print("Epoch: {}, Loss: {:.5f}".format(epoch + 1, loss.item()))

Epoch: 10, Loss: 0.37456
Epoch: 20, Loss: 0.04680
Epoch: 30, Loss: 0.20995
Epoch: 40, Loss: 0.28342
Epoch: 50, Loss: 0.00149
Epoch: 60, Loss: 0.10483
Epoch: 70, Loss: 0.00439
Epoch: 80, Loss: 0.00140
Epoch: 90, Loss: 0.00027
Epoch: 100, Loss: 0.00577
Epoch: 110, Loss: 0.00063
Epoch: 120, Loss: 0.13665
Epoch: 130, Loss: 0.00012
Epoch: 140, Loss: 0.01153
Epoch: 150, Loss: 0.00207
Epoch: 160, Loss: 0.04092
Epoch: 170, Loss: 0.00170
Epoch: 180, Loss: 0.00022
Epoch: 190, Loss: 0.04362
Epoch: 200, Loss: 3.26396


In [10]:
model.eval()

SimpleMLP(
  (input_fc): QuantLinear(
    in_features=30, out_features=256, bias=True
    (input_quant): IdentityQuantProxyFromInjector(
      (_zero_hw_sentinel): StatelessBuffer()
    )
    (output_quant): IdentityQuantProxyFromInjector(
      (_zero_hw_sentinel): StatelessBuffer()
    )
    (weight_quant): WeightQuantProxyFromInjector(
      (_zero_hw_sentinel): StatelessBuffer()
      (tensor_quant): RescalingIntQuant(
        (int_quant): IntQuant(
          (float_to_int_impl): RoundSte()
          (tensor_clamp_impl): TensorClampSte()
          (delay_wrapper): DelayWrapper(
            (delay_impl): _NoDelay()
          )
        )
        (scaling_impl): StatsFromParameterScaling(
          (parameter_list_stats): _ParameterListStats(
            (first_tracked_param): _ViewParameterWrapper()
            (stats): _Stats(
              (stats_impl): AbsMax()
            )
          )
          (stats_scaling_impl): _StatsScaling(
            (affine_rescaling): Identity()
     

In [11]:
y_true, y_pred, y_prob  = [], [], []
with torch.no_grad():
    for x, y in test_loader:
        y = list(y.numpy())
        y_true += y

        x = x.float()
        outputs = model(x)

        # predicted label
        _, predicted = torch.max(outputs.data, 1)
        predicted = list(predicted.cpu().numpy())
        y_pred += predicted

        # probability for each label
        prob = list(outputs.cpu().numpy())
        y_prob += prob

In [12]:
# calculating overall accuracy
num_correct = 0

for i in range(len(y_true)):
    if y_true[i] == y_pred[i]:
        num_correct += 1

print("Accuracy: ", num_correct/len(y_true))

Accuracy:  0.9665924276169265


## Bidirectional LSTM

In [13]:
class BiDirectional_LSTM(torch.nn.Module):
    def __init__(self, input_size, output_size):
        super(SimpleMLP, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        
        self.input_fc = nn.QuantLinear(input_size, 256, bias=True)
        self.hidden_fc = nn.QuantLinear(256, 128, bias=True)
        self.output_fc = nn.QuantLinear(128, output_size, bias=True)
        self.lstm = eval('nn.' + rnn_type)(
            self.input_dim, self.hidden_dim, self.num_layers, batch_first=True, bidirectional=True
        )        
        self.relu = nn.QuantReLU()

    def init_hidden(self):
        # This is what we'll initialise our hidden state as
        return (torch.zeros(self.num_layers, self.batch_size, self.hidden_dim),
                torch.zeros(self.num_layers, self.batch_size, self.hidden_dim))

    def forward(self, x):
        x_1 = self.input_fc(x)
        h_1 = torch.nn.functional.relu(self.input_fc(x))
        x_2 = self.hidden_fc(h_1)
        h_2 = torch.nn.functional.relu(x_2)
        # Forward pass through LSTM layer
        # shape of lstm_out: [batch_size, input_size ,hidden_dim]
        # shape of self.hidden: (a, b), where a and b both
        # have shape (batch_size, num_layers, hidden_dim).
        lstm_out, self.hidden = self.lstm(h_2)

        # Can pass on the entirety of lstm_out to the next layer if it is a seq2seq prediction
        y_pred = self.output_fc(lstm_out)
        return y_pred

In [14]:
# kfold = KFold(n_splits=5)

In [15]:
# for train_index, test_index in kfold.split(X_train, y_train):  
#     x_train_fold = X_train[train_index] 
#     y_train_fold = y_train[train_index] 
#     x_test_fold = X_train[test_index] 
#     y_test_fold = y_train[test_index] 

#     print(x_train_fold.shape, y_train_fold.shape) 
#     print(x_test_fold.shape, y_test_fold.shape) 
#     break 