In [1]:
import torch
import random
import numpy as np
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import pandas as pd

random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.backends.cudnn.deterministic = True

In [2]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

LABELS = [    
    "JUMPING",
    "JUMPING_JACKS",
    "BOXING",
    "WAVING_2HANDS",
    "WAVING_1HAND",
    "CLAPPING_HANDS"

] 
DATASET_PATH = "poses_dataset/"

X_train_path = DATASET_PATH + "X_train.txt"
X_test_path = DATASET_PATH + "X_test.txt"

y_train_path = DATASET_PATH + "Y_train.txt"
y_test_path = DATASET_PATH + "Y_test.txt"

# Hyper-parameters 
# input_size = 784 # 28x28
num_classes = 6
num_epochs = 2
batch_size = 100
learning_rate = 0.001

input_size = 36
sequence_length = 32
hidden_size = 128
num_layers = 2

In [3]:
column_names = [  "j0_x",  "j0_y", "j1_x", "j1_y" , "j2_x", "j2_y", "j3_x", "j3_y", "j4_x", "j4_y", "j5_x", "j5_y", "j6_x", "j6_y", "j7_x", "j7_y", "j8_x", "j8_y", "j9_x", "j9_y", "j10_x", "j10_y", "j11_x", "j11_y", "j12_x", "j12_y", "j13_x", "j13_y", 'j14_x', "j14_y", "j15_x", "j15_y", "j16_x", "j16_y", "j17_x", "j17_y" ]
x_train_data = pd.read_csv(X_train_path, sep=",", names=column_names, header=None)
x_test_data = pd.read_csv(X_test_path, sep=",", names=column_names, header=None)
y_train_data = pd.read_csv(y_train_path, names=["labels"])
y_test_data = pd.read_csv(y_test_path, names=["labels"])

In [6]:
x_train_data.shape

(724000, 36)

In [7]:
y_train_data.shape[0]

22625

In [11]:
# Fully connected neural network with one hidden layer
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # -> x needs to be: (batch_size, seq, input_size)
        
        # or:
        #self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        #self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        # Set initial hidden states (and cell states for LSTM)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        
        # x: (n, 28, 28), h0: (2, n, 128)
        
        # Forward propagate RNN
        # out, _ = self.rnn(x, h0)  
        # or:
        out, _ = self.lstm(x, (h0,c0))  
        
        # out: tensor of shape (batch_size, seq_length, hidden_size)
        # out: (n, 28, 128)
        
        # Decode the hidden state of the last time step
        out = out[:, -1, :]
        # out: (n, 128)
         
        out = self.fc(out)
        # out: (n, 10)
        return out

In [12]:
model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  

In [90]:
def chunker(x_seq, y_seq, seq_size, batch_size):
    for batch_pos in range(0, len(y_seq), batch_size):
        x_batch = list()
        y_batch = list()
        for pos in range(batch_size):
            # print(x_seq.iloc[pos*seq_size:pos*seq_size + seq_size])
            x_batch.append(x_seq.iloc[pos*seq_size:pos*seq_size + seq_size].values)
            y_batch.append(y_seq.iloc[pos*seq_size:pos*seq_size + seq_size].values)
        
        yield torch.tensor(x_batch), torch.tensor(y_batch)
        

In [92]:
for i, (x_batch, y_batch) in enumerate(chunker(x_test_data, y_test_data, sequence_length, batch_size)):
    if i == 1:
        break
    print(x_batch.shape)

torch.Size([100, 32, 36])


In [9]:
# Train the model
n_total_steps = y_train_data.shape[0]
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        # origin shape: [N, 1, 28, 28]
        # resized: [N, 28, 28]
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{n_total_steps}], Loss: {loss.item():.4f}')


Epoch [1/2], Step [100/600], Loss: 0.7935
Epoch [1/2], Step [200/600], Loss: 0.7263
Epoch [1/2], Step [300/600], Loss: 0.4443
Epoch [1/2], Step [400/600], Loss: 0.4806
Epoch [1/2], Step [500/600], Loss: 0.4036
Epoch [1/2], Step [600/600], Loss: 0.4303
Epoch [2/2], Step [100/600], Loss: 0.3552
Epoch [2/2], Step [200/600], Loss: 0.2609
Epoch [2/2], Step [300/600], Loss: 0.2514
Epoch [2/2], Step [400/600], Loss: 0.2695
Epoch [2/2], Step [500/600], Loss: 0.2530
Epoch [2/2], Step [600/600], Loss: 0.1610


In [10]:
# Test the model
# In test phase, we don't need to compute gradients (for memory efficiency)
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        # max returns (value ,index)
        _, predicted = torch.max(outputs.data, 1)
        n_samples += labels.size(0)
        n_correct += (predicted == labels).sum().item()

    acc = 100.0 * n_correct / n_samples
    print(f'Accuracy of the network on the 10000 test images: {acc} %')

Accuracy of the network on the 10000 test images: 94.92 %
