In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import math
import numpy as np
import os
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
import torch.optim as optim


#### Transformer Encoder

In [38]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)
    

class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, d_model, heads, num_layers,dropout, max_len, output_classes):
        super().__init__()
        self.encoder_layer= nn.TransformerEncoderLayer(d_model=d_model, nhead=heads)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.positional_encoder = PositionalEncoding(d_model=d_model, dropout=dropout, max_len=max_len)
        self.linear = nn.Linear(in_features=d_model, out_features=output_classes)
        self.fc = nn.Linear(input_dim, d_model)
        
    def forward(self, x):
        x1 = self.fc(x)
        x1 = self.positional_encoder(x1)
        x2 = self.encoder(x1)
        out = self.linear(x2)
        
        return out
        

class TransformerDecoder(nn.Module):
    def __init__(self, input_dim, d_model, heads, num_layers,dropout, max_len, output_classes):
        super().__init__()
        self.decoder_layer= nn.TransformerDecoderLayer(d_model=d_model, nhead=heads)
        self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
        self.linear = nn.Linear(in_features=d_model, out_features=output_classes)
        self.fc = nn.Linear(input_dim, d_model)
        
    def forward(self, x,y):
        x1 = self.fc(x)
        y1 = self.fc(y)
        x2 = self.decoder(y1,x1)
        out = self.linear(x2)
        return out
    

class Transformer(nn.Module):
    def __init__(self, input_dim, d_model, heads, num_layers,dropout, max_len, output_classes, is_train):
        super().__init__()

        self.is_train = is_train
        self.encoder = TransformerEncoder(input_dim,d_model, heads,num_layers,dropout,max_len,output_classes)
        self.decoder = TransformerDecoder(output_classes,d_model, heads,num_layers,dropout,max_len,output_classes)
        
    def forward(self,x,y):
        if self.is_train:
            enc_out = self.encoder(x) #current_gestures
            dec_out = self.decoder(y,enc_out) #current_gesture, current_kinematicdata y --> pred
        else:
            enc_out = self.encoder(x)
            dec_out = self.decoder(enc_out,enc_out)
            
        return enc_out,dec_out
    

kinematic_features = 22 #input_dim
dmodel = 64 #dmodel
heads = 4
num_layers = 2
dropout = 0.1
max_len = 100
output_classes = 7

is_train = True
# src = torch.rand(10, 10, kinematic_features)

model = Transformer(kinematic_features,dmodel, heads,num_layers,dropout,max_len,output_classes,is_train)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.to(device)


Transformer(
  (encoder): TransformerEncoder(
    (encoder_layer): TransformerEncoderLayer(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
      )
      (linear1): Linear(in_features=64, out_features=2048, bias=True)
      (dropout): Dropout(p=0.1, inplace=False)
      (linear2): Linear(in_features=2048, out_features=64, bias=True)
      (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
    )
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-1): 2 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
          )
          (linear1): Linear(in_features=64, out_features=2048, bias=True)
    

#### Dataset Class

In [39]:
gesture_labels_directory = './new_dataset/peg_transfer/peg_transfer/labeled/gestures/'
window_size = 10 # 10 frame windows (kinematic data)
enite_dataset = []
counter = 0



class KinematicDataset(Dataset):
    
    def __init__(self,data_directory, window_size, pred_window):
        
        self.directory = data_directory
        self.window_size = window_size
        self.pred_window = pred_window
        self.le = preprocessing.LabelEncoder()
        self.enc = preprocessing.OneHotEncoder(sparse_output=False)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        (self.X, self.Y) = self._load_data()
        self.X = self.X.to(self.device)
        self.Y = self.Y.to(self.device)
        
        
    def _load_data(self):
        x = []
        y = []
        self.windows_per_file = []
        
        for filename in os.listdir(self.directory):
            f = os.path.join(self.directory, filename)
            # checking if it is a file
            if os.path.isfile(f):
                kinematics_data = pd.read_csv(f)
                kin_data = kinematics_data.iloc[:,:-1]
                kin_label = kinematics_data.iloc[:,-1]
                
                x.append(kin_data.values)
                y.append(kin_label.values)
                
                
                
                if len(self.windows_per_file) == 0:
                    self.windows_per_file = [len(kin_label) - self.window_size + 1]
                else:
                    self.windows_per_file.append(self.windows_per_file[-1] + len(kin_label) - self.window_size + 1)
        
        self.le.fit(y[0])
        y = [self.le.transform(yi) for yi in y]
        
        y = [yi.reshape(len(yi), 1) for yi in y]
        
        y = [self.enc.fit_transform(yi) for yi in y]
        
        x = np.concatenate(x)
        y = np.concatenate(y)
        
        x = torch.from_numpy(x)
        y = torch.from_numpy(y)
        
        x = x.to(torch.float32)
        y = y.to(torch.float32)
        return (x, y)
    
    def __len__(self):
        # this should return the size of the dataset
        return self.Y.shape[0]-self.window_size-self.pred_window
    
    def __getitem__(self, idx):
        # this should return one sample from the dataset
        features = self.X[idx : idx + self.window_size]
        target = self.Y[idx : idx + self.window_size]
        pred_target = self.Y[idx + self.window_size : idx + self.window_size + self.pred_window]
        
        return features, target, pred_target
                    
                    


In [48]:
# train_data, test_data = 

batch_size = 32
window_size = 10
pred_window = 10
dataset = KinematicDataset(gesture_labels_directory,window_size, pred_window)

# use 20% of data for validation
train_set_size = int(len(dataset) * 0.8)
valid_set_size = len(dataset) - train_set_size
print(train_set_size, valid_set_size)

seed = torch.Generator().manual_seed(42)
train_set, test_set = torch.utils.data.random_split(dataset, [train_set_size, valid_set_size],generator=seed)


trainloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,
                                          shuffle=False, num_workers=0)

testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,
                                          shuffle=False, num_workers=0)

print(train_set.dataset.X.shape, test_set.dataset.X.shape)
print(train_set.dataset.Y.shape, test_set.dataset.Y.shape)


119271 29818
torch.Size([149109, 22]) torch.Size([149109, 22])
torch.Size([149109, 7]) torch.Size([149109, 7])


In [45]:
decoder_layer = nn.TransformerDecoderLayer(d_model=512, nhead=8)
transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=6)
memory = torch.rand(10, 32, 512)
tgt = torch.rand(20, 32, 512)
out = transformer_decoder(tgt, memory)

print(out.shape)

torch.Size([20, 32, 512])


In [41]:
# loss function
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [51]:
epochs = 10

for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels, targets = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # enc_outputs = recognized current gestures (t + t_window)
        # dec_outputs = predicted future gestures (t_window + t_pred)
        enc_outputs, dec_outputs = model(inputs,labels)
        
        # print(inputs[0,:,:].shape)
        # print(outputs[0,:,:].shape)
        # print(labels[0,:,:].shape)
        
        # print("Input shapes")
        # print(inputs.shape)
        # print(labels.shape)
        
        # print("Output shapes")
        # print(enc_outputs.shape)
        # print(dec_outputs.shape)
        
        
        enc_outputs = (torch.flatten(enc_outputs, start_dim=0, end_dim=1))
        dec_outputs = (torch.flatten(dec_outputs, start_dim=0, end_dim=1))
        labels = (torch.flatten(labels, start_dim=0, end_dim=1))
        targets = (torch.flatten(targets, start_dim=0, end_dim=1))
        
        # outputs = torch.transpose(outputs,1,2)
        
        loss = criterion(enc_outputs, labels)
        loss += criterion(dec_outputs, targets)
        
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 50 == 49:    # print every 50 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

[1,    50] loss: 0.038
[1,   100] loss: 0.041
[1,   150] loss: 0.040
[1,   200] loss: 0.041
[1,   250] loss: 0.039
[1,   300] loss: 0.040
[1,   350] loss: 0.041
[1,   400] loss: 0.038
[1,   450] loss: 0.040
[1,   500] loss: 0.038
[1,   550] loss: 0.039
[1,   600] loss: 0.041
[1,   650] loss: 0.038
[1,   700] loss: 0.039
[1,   750] loss: 0.037
[1,   800] loss: 0.037
[1,   850] loss: 0.039
[1,   900] loss: 0.038
[1,   950] loss: 0.038
[1,  1000] loss: 0.037
[1,  1050] loss: 0.036
[1,  1100] loss: 0.038
[1,  1150] loss: 0.039
[1,  1200] loss: 0.037
[1,  1250] loss: 0.035
[1,  1300] loss: 0.036
[1,  1350] loss: 0.035
[1,  1400] loss: 0.036
[1,  1450] loss: 0.035
[1,  1500] loss: 0.036
[1,  1550] loss: 0.035
[1,  1600] loss: 0.033
[1,  1650] loss: 0.033
[1,  1700] loss: 0.035
[1,  1750] loss: 0.034
[1,  1800] loss: 0.035
[1,  1850] loss: 0.035
[1,  1900] loss: 0.034
[1,  1950] loss: 0.034
[1,  2000] loss: 0.035
[1,  2050] loss: 0.032
[1,  2100] loss: 0.033
[1,  2150] loss: 0.035
[1,  2200] 

In [52]:
MODEL_NAME = "PRED_MODEL_V0.pth"
MODEL_SAVE_PATH = "./models/"+MODEL_NAME

torch.save(obj=model.state_dict(), f=MODEL_SAVE_PATH)

In [53]:
#Test

# y_pred = model(list(iter(test_set)))

for i,data in enumerate(testloader):
    x,y,y_future = data
    
    enc_out, dec_out = model(x,y)
    
    
    enc_out = (torch.flatten(enc_out, start_dim=0, end_dim=1))
    gt = (torch.flatten(y, start_dim=0, end_dim=1))
    
    dec_out = (torch.flatten(dec_out, start_dim=0, end_dim=1))
    gt_future = (torch.flatten(y_future, start_dim=0, end_dim=1))
    
    enc_out = enc_out.cpu()
    enc_out = enc_out.detach().numpy()

    dec_out = dec_out.cpu()
    dec_out = dec_out.detach().numpy()
        
        
    gt = gt.cpu()
    gt = gt.detach().numpy()
    
            
    gt_future = gt_future.cpu()
    gt_future = gt_future.detach().numpy()
    
    gt_f = np.argmax(gt,axis=1)
    enc_out_f = np.argmax(enc_out,axis=1)
    
    
    gt_future_f = np.argmax(gt_future,axis=1)
    dec_out_f = np.argmax(dec_out,axis=1)
    
    print("---- Accuracy ----")
    enc_accuracy = np.mean(gt_f == enc_out_f) * 100
    print("Encoder Accuracy: ", enc_accuracy)
    
    dec_accuracy = np.mean(gt_future_f == dec_out_f) * 100
    print("Decoder Accuracy: ", dec_accuracy)
    print()

---- Accuracy ----
Encoder Accuracy:  91.25
Decoder Accuracy:  90.9375

---- Accuracy ----
Encoder Accuracy:  95.9375
Decoder Accuracy:  94.375

---- Accuracy ----
Encoder Accuracy:  94.6875
Decoder Accuracy:  90.0

---- Accuracy ----
Encoder Accuracy:  86.5625
Decoder Accuracy:  81.875

---- Accuracy ----
Encoder Accuracy:  95.3125
Decoder Accuracy:  97.5

---- Accuracy ----
Encoder Accuracy:  86.875
Decoder Accuracy:  88.75

---- Accuracy ----
Encoder Accuracy:  90.625
Decoder Accuracy:  87.5

---- Accuracy ----
Encoder Accuracy:  90.3125
Decoder Accuracy:  90.3125

---- Accuracy ----
Encoder Accuracy:  92.5
Decoder Accuracy:  88.75

---- Accuracy ----
Encoder Accuracy:  88.125
Decoder Accuracy:  90.625

---- Accuracy ----
Encoder Accuracy:  90.0
Decoder Accuracy:  88.75

---- Accuracy ----
Encoder Accuracy:  93.75
Decoder Accuracy:  90.0

---- Accuracy ----
Encoder Accuracy:  91.875
Decoder Accuracy:  86.25

---- Accuracy ----
Encoder Accuracy:  92.5
Decoder Accuracy:  89.0625

----