# Hand Gesture Detector with MLP

In [None]:
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [None]:
root = '/gdrive/My Drive/MotionDetection'

## Import

In [None]:
import numpy as np
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
import pandas as pd
import random
import time
from sklearn.metrics import confusion_matrix

# set manual seeds 
torch.manual_seed(470)
torch.cuda.manual_seed(470)

## Functions for Data Preprocessing

In [None]:
def normalize(tens):
  """ Normalize 4 dimensional Tensor: batch_size * duration * number of joints * xy-coordinate
      Maintain the ratio and make sure the joints are in 100*100 box."""
  res = torch.zeros(tens.shape)
  for i in range(tens.shape[0]):
    for j in range(tens.shape[1]):
      temp = tens[i, j]
      # print(temp)
      max_x = max(temp[:, 0])
      min_x = min(temp[:, 0])
      max_y = max(temp[:, 1])
      min_y = min(temp[:, 1])
      ratio = (max_x-min_x) / (max_y - min_y)
      res[i, j, :, 0] = (temp[:, 0] - min_x + 1) / (max_y - min_y) * 100
      res[i, j, :, 1] = (temp[:, 1] - min_y + 1) / (max_y - min_y) * 100
      # print(res[i, j])
  return res

# import matplotlib.pyplot as plt

# test = torch.rand(1, 2, 21, 2)
# test[:, :, :, 0] = 12 * test[:, :, :, 0]
# test[:, :, :, 1] = 10 * test[:, :, :, 1]

# test1 = normalize(test)
# plt.scatter(test[:,0, :, 0], test[:, 0, :, 1])
# plt.show()
# plt.scatter(test1[:,0, :, 0], test1[:, 0, :, 1])
# plt.show()
# plt.scatter(test[:,1, :, 0], test[:, 1, :, 1])
# plt.show()
# plt.scatter(test1[:,1, :, 0], test1[:, 1, :, 1])
# plt.show()

In [None]:
def randn_ordered(tens, n):
  """Random sample n from a tensor, maintaining the order"""
  assert tens.size(0) > n
  print(type(tens.shape[0]))
  r = random.sample(range(tens.shape[0]), n)
  r.sort()
  
  return tens[r]
  
# random.seed(a=123)
# test = torch.arange(3*4*5)
# test = test.reshape(5, 4, 3)
# 
# # print(test)
# # print(randn_ordered(test, 3))

In [None]:
def cut_n(tens, n):
  """cut n frames in the middle(0.6) from the sequence"""
  if tens.shape[0] == n:
    return tens

  tt = int(tens.shape[0] * 0.6) - 1
  c = tens[tt-n//2:tt+(n-n//2)]
  if c.shape[0] != n:
    return None
  return tens[tt-n//2:tt+(n-n//2)]

# test = torch.arange(10*5*3)
# test = test.reshape(10, 5, 3)
# 
# print(test)
# print(cut_n(test, 2))
# test2= torch.tensor([[5.8782611e+02, 5.1130432e+02, 3.5325505e-02],  [5.8304352e+02, 4.3956522e+02, 2.4411943e-02],  [4.8260870e+02, 5.2804346e+02, 2.0107470e-02]])
# print(cut_n(test2,1))

In [None]:
def cut_max2(tens):
  """ Find maximum confidence frame and return two frames(maximum and one of its neighbor)"""
  # print(torch.sum(tens[:, :, -1], dim=1))
  tt = torch.argmax(torch.sum(tens[:, :, -1], dim=1), dim=0)
  # return torch.unsqueeze(tens[tt], 0)
  # print(tt)
  if tt == 0:
    return tens[:2]
  if tt == tens.shape[0]-1:
    return tens[-2:]
  if torch.sum(tens[tt-1, :, -1]) >= torch.sum(tens[tt+1, :, -1]):
    return tens[tt-1:tt+1]
  return tens[tt:tt+2]

# test = torch.rand(5*21*3)
# test = test.reshape(5, 21, 3)


# print(test)
# print(cut_max2(test))
# test2= torch.tensor([[5.8782611e+02, 5.1130432e+02, 3.5325505e-02],  [5.8304352e+02, 4.3956522e+02, 2.4411943e-02],  [4.8260870e+02, 5.2804346e+02, 2.0107470e-02]])
# print(cut_max2(test2,1))

In [None]:
def cut_max3(tens):
  """ Find maximum confidence frame and return two frames(maximum and two of its neighbor)"""
  # print(torch.sum(tens[:, :, -1], dim=1))
  tt = torch.argmax(torch.sum(tens[:, :, -1], dim=1), dim=0)
  if tt == 0:
    return tens[:3]
  if tt == tens.shape[0]-1:
    return tens[-3:]
  return tens[tt-1:tt+2]
# test = torch.rand(5*21*3)
# test = test.reshape(5, 21, 3)


# print(test)
# print(cut_max2(test))
# test2= torch.tensor([[5.8782611e+02, 5.1130432e+02, 3.5325505e-02],  [5.8304352e+02, 4.3956522e+02, 2.4411943e-02],  [4.8260870e+02, 5.2804346e+02, 2.0107470e-02]])
# print(cut_max2(test2,1))

## Load Dataset

In [None]:
def parser(line):
  """ Our data is in csv format. Parse each line of the csv"""
  id, left, right = line.split('[[[')
  id = id[:-2]
  left = '[' + left[:-4]
  right = '[' + right[:-2]

  left = left.split(',  ')
  left = [i.strip('[]').split(' ') for i in left]
  left = [list(filter(None, leftelem)) for leftelem in left]
  left = [float(i) for j in left for i in j]
  left = torch.tensor(left)
  left = left.reshape((-1, 3))
  left_avg = sum(left[:, 2])/21
  
  right = right.split(',  ')
  right = [i.strip('[]').split(' ') for i in right]
  right = [list(filter(None, rightelem)) for rightelem in right]
  right = [float(i) for j in right for i in j]
  right = torch.tensor(right)
  right = left.reshape((-1, 3))
  right_avg = sum(right[:, 2])/21

  v_id, img_id = id.split('_')

  if left_avg >= right_avg:
    left[:,0] = -left[:,0]
    # return v_id, img_id, normalize(left)
    return v_id, img_id, left
  # return v_id, img_id, normalize(right)
  return v_id, img_id, right
  #   return v_id, img_id, normalize(left[:, :2])
  # return v_id, img_id, normalize(right[:, :2])


# v_id, img_id, hand = parser('1_00001,"[[[5.8782611e+02 5.1130432e+02 3.5325505e-02],  [5.8304352e+02 4.3956522e+02 2.4411943e-02],  [4.8260870e+02 5.2804346e+02 2.0107470e-02],  [4.6108698e+02 5.3043475e+02 2.2252293e-02],  [4.7543481e+02 5.6391302e+02 2.0914197e-02],  [4.7543481e+02 5.2804346e+02 2.7845293e-02],  [4.6586960e+02 5.4000000e+02 2.5432626e-02],  [5.1847827e+02 5.4956519e+02 1.6933031e-02],  [4.8021741e+02 5.6391302e+02 1.2440545e-02],  [5.3282611e+02 4.5630432e+02 1.2662101e-02],  [5.0652176e+02 5.4000000e+02 2.5466656e-02],  [5.1130435e+02 5.2565216e+02 2.5591202e-02],  [5.0891306e+02 5.4956519e+02 1.0660242e-02],  [4.7065219e+02 5.2565216e+02 1.6639609e-02],  [5.2804352e+02 5.0173911e+02 2.2738149e-02],  [5.0413046e+02 5.3043475e+02 2.0388147e-02],  [4.8021741e+02 5.5434778e+02 1.0172251e-02],  [4.5152176e+02 4.3239130e+02 1.6042734e-02],  [4.8500003e+02 5.4956519e+02 1.4421927e-02],  [4.9217392e+02 5.6152173e+02 1.5104427e-02],  [4.8978262e+02 5.6152173e+02 1.0827912e-02]]]","[[[4.8782608e+02 4.5434781e+02 4.6337242e-03],  [4.8065216e+02 4.9021738e+02 8.7779146e-03],  [4.8782608e+02 5.2847821e+02 8.5121123e-03],  [4.7108694e+02 4.6869565e+02 8.6766174e-03],  [4.6869565e+02 4.6630432e+02 7.1836482e-03],  [5.4043475e+02 4.2565216e+02 8.7995632e-03],  [4.2804346e+02 4.8782608e+02 8.3582215e-03],  [3.4913043e+02 5.1173911e+02 1.0578417e-02],  [4.0891302e+02 4.8782608e+02 9.7930310e-03],  [5.2847821e+02 4.1130432e+02 5.6122812e-03],  [4.2804346e+02 4.8782608e+02 8.1787128e-03],  [4.3043475e+02 4.9499997e+02 1.0875644e-02],  [4.1608694e+02 4.9021738e+02 9.1631478e-03],  [4.1608694e+02 4.8782608e+02 8.3672330e-03],  [4.2804346e+02 4.8782608e+02 9.0226326e-03],  [4.2565216e+02 5.0217389e+02 9.3891509e-03],  [4.1847824e+02 5.0934781e+02 8.6548291e-03],  [5.3086957e+02 4.2565216e+02 1.0162307e-02],  [3.5391302e+02 5.2369562e+02 1.0465117e-02],  [3.5391302e+02 5.2369562e+02 1.5369176e-02],  [3.6347824e+02 5.2130432e+02 8.9091975e-03]]]"')

# print(v_id, img_id)
# print(hand)

In [None]:
import random
import datetime

def load_data():
  """ Load data from the csv files."""
  result_dir = Path(root) / 'OpenPose.csv'
  csv_list = ['result_Gesture1.csv','result_Gesture2.csv','result_Gesture3.csv','result_Gesture4.csv','result_Gesture5.csv']
  train = []
  test = []
  for i in range(5):
    print("reading start for", i, "th csv!", datetime.datetime.now().time())
    result_dir = Path(root) / csv_list[i]
    data_csv = open(str(result_dir), 'r')
    data_lines = data_csv.readlines()[1:]

    dataset =  dict() # create empty dictionary
    v_ids = set()
    
    for line in data_lines: 
      v_id, img_id, hand = parser(line) # parse each data line. l, r contains joint position of a image.

      if v_id in v_ids: # if v_id already exists, append the joints to existing video id.
        dataset[v_id].append(hand)
      else: # if not, start new sequence.
        dataset[v_id] = [hand]
        v_ids.add(v_id)
    tot = list(dataset.values())
    totlen = len(tot)
    
    for v in tot[:int(totlen*0.8)]:
      x = cut_max2(torch.stack(v))
      if x is None:
        continue
      # x_ = torch.stack([x[i+1] - x[i] for i in range(x.shape[0]-1)])
      # x = torch.cat((x, x_))
      train.append((x, i))

    for v in tot[int(totlen*0.8):]:
      x = cut_max2(torch.stack(v))
      if x is None:
        continue
      # x_ = torch.stack([x[i+1] - x[i] for i in range(x.shape[0]-1)])
      # x = torch.cat((x, x_))
      test.append((x, i))

  random.shuffle(train)
  random.shuffle(test)

  return train, test

In [None]:
train, test = load_data()

reading start for 0 th csv! 12:17:39.387453
reading start for 1 th csv! 12:19:01.386062
reading start for 2 th csv! 12:20:14.721937
reading start for 3 th csv! 12:21:31.964052
reading start for 4 th csv! 12:22:51.346956


In [None]:
print(len(test), len(train))

4462 17838


In [None]:
class HandposeDataset(Dataset):
  """ handpose dataset """
  def __init__(self, dataset):
    # temp = []
    # for i in dataset:
    #   if i[1] != 0:
    #     temp.append((i[0], i[1]-1))
    # dataset = temp
    self.len = len(dataset)
    
    # x = [i[0][:,:,:-1] for i in dataset] # without confidence
    x = [i[0] for i in dataset] # with confidence
    print(len(x), "len of x")
    self.x_data = torch.stack(x)
    y = [i[1] for i in dataset]
    # for i, v in enumerate(y):
      # if v == 3:
      #   y[i] = 0
      # elif v == 4:
      #   y[i] = 3
    print(len(y), "len of y")
    self.y_data = torch.tensor(y)
    print('x_data', self.x_data.shape)
    print('y_data', self.y_data.shape)
  
  def __getitem__(self, index):
    return self.x_data[index], self.y_data[index]

  def __len__(self):
    return self.len

hand_train = HandposeDataset(dataset = train)
hand_test = HandposeDataset(dataset = test)

train_loader = DataLoader(dataset = hand_train,
                          batch_size=32,
                          shuffle = True,
                          num_workers = 1)
test_loader = DataLoader(dataset = hand_test,
                          batch_size=32,
                          shuffle = False,
                          num_workers = 1)

## Model

In [None]:
class MLPBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MLPBlock, self).__init__()
        self.fc1 = nn.Linear(in_channels, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512,256)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, 64)
        self.bn3 = nn.BatchNorm1d(64)
        # self.dropout = nn.Dropout(p=0.2)
        self.act = nn.ReLU()

    def forward(self, x):
        output = self.act(self.bn1(self.fc1(x)))
        output = self.act(self.bn2(self.fc2(output)))
        output = self.act(self.bn3(self.fc3(output)))

        # output = self.act(self.fc1(x))
        # output = self.act(self.fc2(output))
        # output = self.act(self.fc3(output))
        # output = self.act(self.fc3(output))
        
        # output = self.dropout(self.act(self.bn1(self.fc1(x))))
        # output = self.dropout(self.act(self.bn2(self.fc2(output))))
        # output = self.dropout(self.act(self.bn3(self.fc3(output))))
        # output = self.dropout(self.act(self.bn3(self.fc3(output))))
        return output

In [None]:
class GestureDetector(nn.Module):
    def __init__(self, nf):
          super(GestureDetector, self).__init__()
          block = MLPBlock
          self.mlp = block(2*21*3, nf) # with confidence
          # self.mlp = block(2*21*2, nf) # without confidence
          self.fc = nn.Linear(nf, 5) # our gesture dataset is consisted of 5 classes
          # self.fc = nn.Linear(nf, 4)

    def forward(self, x):
        output = normalize(x)
        output = self.mlp(x.view(x.size()[0], -1))
        output = self.fc(output)
        return output

In [None]:
def weight_init(m):
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        torch.nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            torch.nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.BatchNorm2d):
        torch.nn.init.constant_(m.weight, 1)
        torch.nn.init.constant_(m.bias, 0)

## Training

In [None]:
def train_net(net, optimizer, scheduler, model_name, epoch):
    global_step = 0
    best_accuracy = 0
    ckpt_dir = Path(root)
    for epoch in range(epoch):
        net.train()
        for batch_idx, (x, y) in enumerate(train_loader):
            global_step += 1

            x = x.to(device=device)
            y = y.to(device=device)            

            logit = net(x)
            pred = logit.argmax(dim = 1)
            accuracy = (pred == y).float().mean()
            loss = nn.CrossEntropyLoss()(logit, y)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # test loop.
        net.eval()
        nb_classes = 5
        confusion_matrix = torch.zeros(nb_classes, nb_classes)
        with torch.no_grad():
            test_loss = 0.
            test_accuracy = 0.
            test_num_data = 0.
            for batch_idx, (x, y) in enumerate(test_loader):
                x = x.to(device=device)
                y = y.to(device=device)
                
                logit = net(x)
                pred = logit.argmax(dim = 1)
                accuracy = (pred == y).float().mean()
                loss = nn.CrossEntropyLoss()(logit, y)

                test_loss += loss.item()*x.shape[0]
                test_accuracy += accuracy.item()*x.shape[0]
                test_num_data += x.shape[0]
                
                for t, p in zip(y.view(-1), pred.view(-1)):
                  confusion_matrix[t.long(), p.long()] += 1

            test_loss /= test_num_data
            test_accuracy /= test_num_data
            print('epoch:', epoch, 'test loss:', test_loss, 'test_accuracy:', test_accuracy)
            if test_accuracy > best_accuracy:
                best_accuracy = test_accuracy
                best_confusion = confusion_matrix
                torch.save(net.state_dict(), f'{ckpt_dir}/{model_name}.pt')
        scheduler.step()
    return best_confusion, best_accuracy

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
network = GestureDetector(64).to(device)
network.apply(weight_init)

In [None]:
final_accs = {}

# Define optimizer
optimizer = optim.SGD(network.parameters(), lr=0.0005, momentum=0.9, weight_decay=0.0001)
# scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[50,80], gamma=0.5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size= 10, gamma = 0.5)
epoch = 100

global_step = 0
best_accuracy = 0.0

t1 = time.time()
confusion, accuracy = train_net(network, optimizer, scheduler, 'max2_4l', epoch)
t = time.time()-t1
print(f'Best test accuracy of {block_type} network : {accuracy:.3f} took {t:.3f} secs')
print(confusion)