In [52]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

from sklearn.model_selection import train_test_split
import warnings

warnings.filterwarnings("ignore")

import dataset

In [68]:
class positionalEncoder(nn.Module):

  def __init__(self, frame_length, encoding_length):
    super().__init__()

    embedding = nn.Embedding(frame_length, encoding_length)
    
    self.pe = embedding(torch.tensor([i for i in range(frame_length)]))

  def forward(self, x):

    if len(x.shape) == 3:
      if len(self.pe.shape) != 3:
        self.pe = self.pe.unsqueeze(0).repeat(x.shape[0], 1, 1)
      
      x = torch.cat((x, self.pe[:x.shape[0]]), 2)
    else:
      x = torch.cat((x, self.pe[:x.shape[0]]), 1)

    return x

In [69]:
class classifierTransformer(nn.Module):

  def __init__(self, inFeatCount, num_T_layers, num_frames, device, pos_encode_size = 5, n_heads = 4, n_hidden = 2048, dropout = 0.3, outFeatCount = 2):
    super().__init__()

    self.posEncoder = positionalEncoder(num_frames, pos_encode_size)

    heads = n_heads
    num_features = inFeatCount + pos_encode_size

    if (num_features % heads) != 0:
      heads += heads - (num_features % heads)

    print(f'features = {num_features}, heads = {heads}')

    n_hidden = max(n_hidden, 2*num_features)

    encoder_layer = nn.TransformerEncoderLayer(inFeatCount + pos_encode_size, heads, n_hidden, dropout)
    self.encoder = nn.TransformerEncoder(encoder_layer, num_T_layers)
    
    many_to_one_feat = num_frames * num_features
    mid = (many_to_one_feat - outFeatCount) // 2 + outFeatCount

    self.fc1 = nn.Linear(many_to_one_feat, mid)
    self.fc2 = nn.Linear(mid, 2)

    self.device = device

    self.init_weights()

  def init_weights(self):
      initrange = 0.1
      self.fc1.bias.data.zero_()
      self.fc1.weight.data.uniform_(-initrange, initrange)

      self.fc2.bias.data.zero_()
      self.fc2.weight.data.uniform_(-initrange, initrange)

  def forward(self, x):
    
    #x.shape = [num_frames, feat_count]
    encoded = self.posEncoder(x)

    #encoded.shape = [num_frames, feat_count + pos_encoding_count]
    data = self.encoder(encoded)

    #data.shape = [num_frames, feat_count + pos_encoding_count]
    if len(data.shape) == 3:
      data = torch.reshape(data, (data.shape[0], data.shape[1] * data.shape[2]))
    else:
      data = torch.reshape(data, (1,-1))

    #data.shape = [1, num_frames * (feat_count + pos_encoding_count)] 

    data = self.fc1(data)
    data = self.fc2(data)
    ##data = nn.functional.softmax(data, dim = 1).to(self.device) 

    return data.float()

In [47]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

lie_trial_path = './data/trial/lie/' #60 entries
truth_trial_path = './data/trial/truth/' #61 entries

lie_MU3D_path = './data/MU3D/lie/' 
truth_MU3D_path = './data/MU3D/truth/'

lie_BOL_path = './data/BOL/lie/' 
truth_BOL_path = './data/BOL/truth/' 

# no split by person
numOfFrames = 10

X, Y = dataset.preprocessing(truth_trial_path, lie_trial_path, numOfFrames=numOfFrames)

TEST_RATIO = 0.2

xTrain, xTest = train_test_split(X, test_size=TEST_RATIO, shuffle=False)
yTrain, yTest = train_test_split(Y, test_size=TEST_RATIO, shuffle=False)

yTrain_temp, yTest_temp = [], []

for i in range(yTrain.shape[0]):
    yTrain_temp.append([1,0]) if yTrain[i] == 0 else yTrain_temp.append([0,1])

for i in range(yTest.shape[0]):
    yTest_temp.append([1,0]) if yTest[i] == 0 else yTest_temp.append([0,1])

y_Train = torch.tensor(yTrain_temp).to(device)
y_Test = torch.tensor(yTest_temp).to(device)

x_Train = torch.tensor(xTrain, dtype=torch.float32).to(device)
x_Test = torch.tensor(xTest, dtype=torch.float32).to(device)

In [70]:
#model prep
featCount = 10
num_frames = 10
encoder_layers = 2

Transformer = classifierTransformer(featCount, encoder_layers, num_frames, device)

# training
def train(model, xTrain, yTrain, xTest, yTest, epochs = 100, lr = 0.005, batch_size = 10):
    """ Train a model on a dataset """
    
    # create a data loader to handle batching
    xTrain_loader = DataLoader(xTrain, batch_size=batch_size, shuffle=False)
    xTest_loader = DataLoader(xTest, batch_size=batch_size, shuffle=False)

    # create a loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # train the model
    for epoch in range(epochs):

        idx = 0
        model.train()

        tot_loss = 0
        tot_acc = 0
        
        for batch in xTrain_loader:

            optimizer.zero_grad()

            # get data
            x_train = batch.to(device).float()
            y_train = torch.tensor(yTrain[idx:min(idx+batch_size,len(yTrain))]).float().clone().detach().to(device)

            # forward pass
            y_pred = model(x_train)

            actual_batch = torch.argmax(y_train, dim=1).long()
            my_pred_batch = torch.argmax(y_pred, dim=1).long()
            tot_acc += ((actual_batch == my_pred_batch).sum().item() / len(actual_batch))
            #print("actual for batch ", idx, " is ", torch.argmax(y_train, dim=1).long())
            #print("my prediction for batch ", idx, " is ", torch.argmax(y_pred, dim=1).long())

            # compute loss
            loss = loss_fn(y_pred,torch.argmax(y_train, dim=1).long())

            tot_loss += loss.item()
            
            # backward pass
            loss.backward(retain_graph=True)

            # update weights
            optimizer.step()

            idx += batch_size
            
        total_loss = tot_loss / len(xTrain_loader)
        total_acc = tot_acc / len(xTrain_loader)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}, Accuracy: {total_acc:.4f}')

        # evaluate
        model.eval()

        if epoch % 10 == 0:

            with torch.no_grad():
            
                idx_test = 0
                test_acc = 0
                for batch in xTest_loader:
                    xTest = batch.to(device).float()
                    y_test = torch.tensor(yTest[idx_test:min(idx_test+batch_size,len(yTest))]).float().clone().detach().to(device)
                    y_pred = model(xTest)

                    actual_batch = torch.argmax(y_test, dim=1).long()
                    my_pred_batch = torch.argmax(y_pred, dim=1).long()

                    #compute test accuracy
                    test_acc += (actual_batch == my_pred_batch).float().mean().item()
                    idx_test += batch_size

                test_acc /= len(xTest_loader)
                print(f'Test Accuracy: {test_acc:.4f}')  

features = 15, heads = 5


In [71]:

train(Transformer, xTrain, y_Train, xTest, y_Test)

Epoch 1/100, Loss: 0.5801, Accuracy: 0.6901
Epoch 1/100, Test Accuracy: 0.7312
Epoch 2/100, Loss: 0.5420, Accuracy: 0.7263
Epoch 3/100, Loss: 0.5336, Accuracy: 0.7310
Epoch 4/100, Loss: 0.5306, Accuracy: 0.7336


KeyboardInterrupt: 