# Dependencies and dataset

In [None]:
import os
import shutil
import re
import fnmatch
import pickle
import random
import pandas as pd
import numpy as np
import tensorflow as tf
import cv2
from google.colab.patches import cv2_imshow
from mpl_toolkits.mplot3d import Axes3D
!pip install simpleaudio
import simpleaudio as sa
!pip install mediapipe
import mediapipe as mp
import matplotlib.pyplot as plt
from IPython.display import clear_output
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import load_img
from matplotlib import pyplot

In [None]:
pip freeze > requirements.txt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
if os.path.isdir('/content/sample_data'):
  shutil.rmtree('/content/sample_data', ignore_errors=True)

!unzip /content/drive/MyDrive/multiview_hand_pose_dataset_uploaded_v2.zip       #takes ~7-8 mins
clear_output()
print('Dataset unzipped!')

In [None]:
shutil.rmtree('augmented_samples', ignore_errors=True)

# Projected coordinates to webcam frames

In [None]:
def recursive_glob(rootdir='.', pattern='*'):
  matches = []
  for root, dirnames, filenames in os.walk(rootdir):
    for filename in fnmatch.filter(filenames, pattern):
      matches.append(os.path.join(root, filename))
  return matches

def natural_sort(l): 
    convert = lambda text: int(text) if text.isdigit() else text.lower() 
    alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] 
    return sorted(l, key = alphanum_key)

def readAnnotation3D(file):
  f = open(file, "r")
  an = []
  for l in f:
    l = l.split()
    an.append((float(l[1]),float(l[2]), float(l[3])))
  return np.array(an, dtype=float)

def readAnnotation2D(file):
  f = open(file, "r")
  an = []
  for l in f:
    l = l.split()
    an.append((float(l[1]),float(l[2])))
  return np.array(an, dtype=float)

def saveAnnotation(jointCamPath, positions):
  fOut = open(jointCamPath, 'w')
  fOut.write("F4_KNU1_A " + str(positions[0][0]) + " " + str(positions[0][1]) + "\n")
  fOut.write("F4_KNU1_B " + str(positions[1][0]) + " " + str(positions[1][1]) + "\n")
  fOut.write("F4_KNU2_A " + str(positions[2][0]) + " " + str(positions[2][1]) + "\n")
  fOut.write("F4_KNU3_A " + str(positions[3][0]) + " " + str(positions[3][1]) + "\n")

  fOut.write("F3_KNU1_A " + str(positions[4][0]) + " " + str(positions[4][1]) + "\n")
  fOut.write("F3_KNU1_B " + str(positions[5][0]) + " " + str(positions[5][1]) + "\n")
  fOut.write("F3_KNU2_A " + str(positions[6][0]) + " " + str(positions[6][1]) + "\n")
  fOut.write("F3_KNU3_A " + str(positions[7][0]) + " " + str(positions[7][1]) + "\n")

  fOut.write("F1_KNU1_A " + str(positions[8][0]) + " " + str(positions[8][1]) + "\n")
  fOut.write("F1_KNU1_B " + str(positions[9][0]) + " " + str(positions[9][1]) + "\n")
  fOut.write("F1_KNU2_A " + str(positions[10][0]) + " " + str(positions[10][1]) + "\n")
  fOut.write("F1_KNU3_A " + str(positions[11][0]) + " " + str(positions[11][1]) + "\n")

  fOut.write("F2_KNU1_A " + str(positions[12][0]) + " " + str(positions[12][1]) + "\n")
  fOut.write("F2_KNU1_B " + str(positions[13][0]) + " " + str(positions[13][1]) + "\n")
  fOut.write("F2_KNU2_A " + str(positions[14][0]) + " " + str(positions[14][1]) + "\n")
  fOut.write("F2_KNU3_A " + str(positions[15][0]) + " " + str(positions[15][1]) + "\n")

  fOut.write("TH_KNU1_A " + str(positions[16][0]) + " " + str(positions[16][1]) + "\n")
  fOut.write("TH_KNU1_B " + str(positions[17][0]) + " " + str(positions[17][1]) + "\n")
  fOut.write("TH_KNU2_A " + str(positions[18][0]) + " " + str(positions[18][1]) + "\n")
  fOut.write("TH_KNU3_A " + str(positions[19][0]) + " " + str(positions[19][1]) + "\n")
  fOut.write("PALM_POSITION " + str(positions[20][0]) + " " + str(positions[20][1]) + "\n")
  fOut.close()

def getCameraMatrix():
  Fx = 614.878
  Fy = 615.479
  Cx = 313.219
  Cy = 231.288
  cameraMatrix = np.array([[Fx, 0 , Cx],
                           [0 , Fy, Cy],
                           [0 , 0 , 1]])
  return cameraMatrix

def getDistCoeffs():
  return np.array([0.092701, -0.175877, -0.0035687, -0.00302299, 0])

In [None]:
def generate2Dcoordinates():
  pathToDataset="/content/annotated_frames/"

  cameraMatrix = getCameraMatrix()
  distCoeffs = getDistCoeffs()

  if os.path.isdir('2d_points'):
    shutil.rmtree('2d_points', ignore_errors=True)
  os.mkdir('2d_points')

  # iterate sequences
  for i in range(1,22):
    subdir_path = '/content/2d_points/data_'+str(i)+'/'
    os.mkdir(subdir_path)
    # read the color frames
    path = pathToDataset+"data_"+str(i)+"/"
    colorFrames = recursive_glob(path, "*_webcam_[0-9]*")
    colorFrames = natural_sort(colorFrames)
    print("There are "+str(len(colorFrames))+" color frames on the sequence data_"+str(i))
    # read the calibrations for each camera
    print ("Loading calibration for ../calibrations/data_"+str(i))
    c_0_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_1/rvec.pkl","rb"),encoding='bytes')
    c_0_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_1/tvec.pkl","rb"),encoding='bytes')
    c_1_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_2/rvec.pkl","rb"),encoding='bytes')
    c_1_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_2/tvec.pkl","rb"),encoding='bytes')
    c_2_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_3/rvec.pkl","rb"),encoding='bytes')
    c_2_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_3/tvec.pkl","rb"),encoding='bytes')
    c_3_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_4/rvec.pkl","rb"),encoding='bytes')
    c_3_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_4/tvec.pkl","rb"),encoding='bytes')

    rand_idx = random.randint(0, len(colorFrames))

    for j in range(len(colorFrames)):
      print(colorFrames[j])
      toks1 = colorFrames[j].split("/")
      toks2 = toks1[4].split("_")
      jointPath = "/"+toks1[1]+"/"+toks1[2]+"/"+toks1[3]+"/"+toks2[0]+"_joints.txt"
      
      print(jointPath)
      points3d = readAnnotation3D(jointPath)[0:21] # the last point is the normal

      # project 3d LM points to the image plane
      webcam_id = int(toks2[2].split(".")[0])-1
      print ("Calibration for webcam id: "+str(webcam_id))
      if webcam_id == 0:
        rvec = c_0_0
        tvec = c_0_1
      elif webcam_id == 1:
        rvec = c_1_0
        tvec = c_1_1
      elif webcam_id == 2:
        rvec = c_2_0
        tvec = c_2_1
      elif webcam_id == 3:
        rvec = c_3_0
        tvec = c_3_1

      points2d, _ = cv2.projectPoints(points3d, rvec, tvec, cameraMatrix, distCoeffs)


      # HERE YOU CAN SAVE points2d TO A FILE IF YOU WANT
      pathToSave = "/content/2d_points/data_"+str(i)+"/"+toks2[0]+"_webcam_"+toks2[2].split(".")[0]+".txt"
      print("Saving 2d projections"+pathToSave)
      saveAnnotation(pathToSave, np.array(points2d).reshape(21,2))

      # show a random sample of the sequence
      show = True
      if show and j > rand_idx and j < rand_idx+4:
        img = cv2.imread(colorFrames[j])
        for k in range(points2d.shape[0]):
          cv2.circle(img, (int(points2d[k][0][0]), int(points2d[k][0][1])), 3, (0,0,255))
        cv2_imshow(img)
        cv2.waitKey(100)
  clear_output()
  print('2D points generated!')

In [None]:
def generate3Dcoordinates():

  pathToDataset="/content/annotated_frames/"
  ##### Make a dir to save new 3D points
  if os.path.isdir('3d_points'):
    shutil.rmtree('3d_points', ignore_errors=True)
  os.mkdir('3d_points')

  # iterate sequences
  for i in range(1,22):
    # make subdirs to save 3D coordinates
    subdir_path = '/content/3d_points/data_'+str(i)+'/'
    os.mkdir(subdir_path)
    # read the color frames
    path = pathToDataset+"data_"+str(i)+"/"
    colorFrames = recursive_glob(path, "*_webcam_[0-9]*")
    colorFrames = natural_sort(colorFrames)
    print("There are "+str(len(colorFrames))+" color frames on the sequence data_"+str(i))
    # read the calibrations for each camera
    print ("Loading calibration for ../calibrations/data_"+str(i))
    c_0_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_1/rvec.pkl","rb"),encoding='bytes')
    c_0_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_1/tvec.pkl","rb"),encoding='bytes')
    c_1_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_2/rvec.pkl","rb"),encoding='bytes')
    c_1_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_2/tvec.pkl","rb"),encoding='bytes')
    c_2_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_3/rvec.pkl","rb"),encoding='bytes')
    c_2_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_3/tvec.pkl","rb"),encoding='bytes')
    c_3_0 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_4/rvec.pkl","rb"),encoding='bytes')
    c_3_1 = pickle.load(open("/content/calibrations/data_"+str(i)+"/webcam_4/tvec.pkl","rb"),encoding='bytes')

    rand_idx = random.randint(0, len(colorFrames))


    for j in range(len(colorFrames)):
      print(colorFrames[j])
      toks1 = colorFrames[j].split("/")
      toks2 = toks1[4].split("_")
      jointPath = "/"+toks1[1]+"/"+toks1[2]+"/"+toks1[3]+"/"+toks2[0]+"_joints.txt"
      print(jointPath)
      points3d = readAnnotation3D(jointPath)[0:21] # the last point is the normal

      # project 3d LM points to the camera coordiante frame
      webcam_id = int(toks2[2].split(".")[0])-1
      print("Calibration for webcam id: "+str(webcam_id))
      if webcam_id == 0:
        rvec = c_0_0
        tvec = c_0_1
      elif webcam_id == 1:
        rvec = c_1_0
        tvec = c_1_1
      elif webcam_id == 2:
        rvec = c_2_0
        tvec = c_2_1
      elif webcam_id == 3:
        rvec = c_3_0
        tvec = c_3_1

      R,_ = cv2.Rodrigues(rvec)
      T = np.zeros((4,4))
      for l in range(R.shape[0]):
        for k in range(R.shape[1]):
          T[l][k] = R[l][k]

      for l in range(tvec.shape[0]):
        T[l][3] = tvec[l]
      T[3][3] = 1

      file_path = subdir_path + toks1[4].split(".")[0] + '.txt'
      text_file = open(file_path, "w")
      print(file_path)
      names = ['F4_KNU1_A', 'F4_KNU1_B', 'F4_KNU2_A', 'F4_KNU3_A', 'F3_KNU1_A', 
              'F3_KNU1_B', 'F3_KNU2_A', 'F3_KNU3_A', 'F1_KNU1_A', 'F1_KNU1_B', 
              'F1_KNU2_A', 'F1_KNU3_A', 'F2_KNU1_A', 'F2_KNU1_B', 'F2_KNU2_A', 
              'F2_KNU3_A', 'TH_KNU1_A', 'TH_KNU1_B', 'TH_KNU2_A', 'TH_KNU3_A', 
              'PALM_POSITION']
      points3d_cam = []
      
      for k in range(len(points3d)):
        p = np.array(points3d[k])
        p = np.append(p, 1)
        p_ = [names[k]] + list(np.matmul(T, p.transpose()))
        points3d_cam.append(p_)
        write_string = str(p_[0])+' '+str(p_[1])+' '+str(p_[2])+' '+str(p_[3])+'\n'
        text_file.write(write_string)
      text_file.close()
  clear_output()
  print('3D points generated!')

In [None]:
generate2Dcoordinates()                                           # runs for ~ 3-4 mins

In [None]:
generate3Dcoordinates()                                           # runs for ~ 5-6 min

# Setup for training

In [None]:
def dataframe_gen(img_size):

  h,w,_ = img_size
  listoflists=[]

  for i in range(1,22):
    path = '/content/annotated_frames/data_'+str(i)+'/'
    colorFrames = recursive_glob(path, "*_webcam_[0-9]*")
    colorFrames = natural_sort(colorFrames)   
    
    for j in range(len(colorFrames)):
      toks1 = colorFrames[j].split("/")
      toks2 = toks1[4].split(".")
      jointPath = '/content/2d_points/'+toks1[3]+'/'+toks2[0]+".txt"
      temp1 = []
      temp1.append(colorFrames[j])

      temp2 = readAnnotation2D(jointPath).ravel().tolist()
      for i in range(len(temp2)):
        if (i%2 is 0):
          temp2[i] = temp2[i] * (w/640)
        else:
          temp2[i] = temp2[i] * (h/480)

      temp1 = temp1 + temp2
      listoflists.append(temp1)
  
  names = ['F4_KNU1_A', 'F4_KNU1_B', 'F4_KNU2_A', 'F4_KNU3_A', 'F3_KNU1_A', 
          'F3_KNU1_B', 'F3_KNU2_A', 'F3_KNU3_A', 'F1_KNU1_A', 'F1_KNU1_B', 
          'F1_KNU2_A', 'F1_KNU3_A', 'F2_KNU1_A', 'F2_KNU1_B', 'F2_KNU2_A', 
          'F2_KNU3_A', 'TH_KNU1_A', 'TH_KNU1_B', 'TH_KNU2_A', 'TH_KNU3_A', 
          'PALM_POSITION']
  vars = ['x', 'y']
  col = []
  for i in names:
    for j in vars:
        col.append(i+'_'+j)
  y_col = col.copy()
  col = ['image_paths'] + col

  dataf = pd.DataFrame(listoflists, columns = col)
  return y_col, dataf


In [None]:
input_img_size = (256,256,3)

In [None]:
y_cols, df_init = dataframe_gen(img_size=input_img_size)
# raw images are (480,640,3)

In [None]:
df = df_init.sample(n=20000,random_state=57)
df

In [None]:
from keras_preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(rescale = 1./255,
                            validation_split=0.2)
image_resize_tuple = (input_img_size[0],input_img_size[1])
train_gen = datagen.flow_from_dataframe(dataframe = df,
                                        directory = None,
                                        x_col='image_paths',
                                        y_col=y_cols,
                                        target_size = image_resize_tuple,
                                        batch_size = 40,
                                        class_mode = 'other',
                                        subset = 'training')
val_gen = datagen.flow_from_dataframe(dataframe = df,
                                      directory = None,
                                      x_col='image_paths',
                                      y_col=y_cols,
                                      target_size = image_resize_tuple,
                                      batch_size = 10,
                                      class_mode = 'other',
                                      subset = 'validation')

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential([

    layers.Conv2D(input_shape=input_img_size,filters=64,kernel_size=(3,3),padding="same", activation="relu"),
    layers.Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu"),
    layers.MaxPool2D(pool_size=(2,2),strides=(2,2)),
    layers.Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"),
    layers.MaxPool2D(pool_size=(2,2),strides=(2,2)),
    layers.Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    layers.MaxPool2D(pool_size=(2,2),strides=(2,2)),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.MaxPool2D(pool_size=(2,2),strides=(2,2)),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    layers.MaxPool2D(pool_size=(2,2),strides=(2,2)),

      # Classifier Head
    layers.Flatten(),
    layers.Dense(units=4000, activation="relu"),
    layers.Dropout(.2),
    layers.Dense(units=1000, activation="relu"),
    layers.Dropout(.2),
    layers.Dense(units=500, activation="relu"),
    layers.Dense(units=100, activation="relu"),
    layers.Dense(units=42, activation="relu")
])
model.compile(optimizer="adam",
              loss=tf.keras.losses.MeanSquaredError(),
              metrics=[tf.keras.metrics.RootMeanSquaredError()])

model.summary()

Training

In [None]:
# Restore the weights
model.load_weights('/content/drive/MyDrive/AI_model/')


In [None]:
hist = {'loss': [],
        'root_mean_squared_error': [],
        'val_loss': [],
        'val_root_mean_squared_error': []}

def runner(num_epoch):
  for i in range(num_epoch):
    history = model.fit(train_gen, validation_data=val_gen, epochs = 1, verbose=1)
    hist['loss'] += history.history['loss']
    hist['root_mean_squared_error'] += history.history['root_mean_squared_error']
    hist['val_loss'] += history.history['val_loss']
    hist['val_root_mean_squared_error'] += history.history['val_root_mean_squared_error']
    clear_output()
    print(hist)

In [None]:
runner(1)

In [None]:
# plot train and val loss 
plt.figure(figsize=(8,4))
plt.title('mean_squared_error')
plt.plot(hist['loss'], color='blue', label='train')
plt.plot(hist['val_loss'], color='red', label='val')
#plt.xlim(0, 20)
plt.ylim(0, 10000)
plt.legend()

In [None]:
# plot train and val metrics 
plt.figure(figsize=(8,4))
plt.title('mean_squared_error')
plt.plot(hist['root_mean_squared_error'], color='blue', label='train')
plt.plot(hist['val_root_mean_squared_error'], color='red', label='val')
#plt.xlim(0, 20)
plt.ylim(0, 100)
plt.legend()

In [None]:
# Save the weights
model.save_weights('/content/drive/MyDrive/AI_model/')

In [None]:
# summarize feature map shapes
for i in range(len(model.layers)):
	layer = model.layers[i]
	# check for convolutional layer
	if 'conv' not in layer.name:
		continue
	# summarize output shape
	print(i, layer.name, layer.output.shape)

In [None]:
def plotfeaturemaps(layernum,imgpath):
  _model_ = tf.keras.Model(inputs=model.inputs, outputs=model.layers[layernum].output)
  img = load_img(imgpath, target_size=(256, 256))

  # convert the image to an array
  img = tf.keras.utils.img_to_array(img)/255
  # expand dimensions so that it represents a single 'sample'
  img = tf.expand_dims(img, axis=0)

  feature_maps = _model_.predict(img)
  # print(np.shape(feature_maps))

  # plot all 64 maps in an 8x8 squares
  square = int(np.shape(feature_maps)[3]**0.5)
  ix = 1
  fig = plt.figure(figsize = (2*square, 2*square))
  for _ in range(square):
    for _ in range(square):
      # specify subplot and turn off axis
      ax = plt.subplot(square, square, ix)
      ax.set_xticks([])
      ax.set_yticks([])

      fm = feature_maps[0, :, :, ix-1]
      min = np.min(fm)
      max = np.max(fm)
      # plot filter channel in grayscale
      if  min != max:
        plt.imshow((fm-min)/(max-min))
      else:
        plt.imshow((fm-min))
      ix += 1
  # show the figure
  plt.show()

In [None]:
for i in range(0,21):
    print('Layer '+str(i+1))
    plotfeaturemaps(i,'/content/annotated_frames/data_15/0_webcam_3.jpg')

# Live testing


In [None]:
drawing = mp.solutions.drawing_utils                                            
styles = mp.solutions.drawing_styles
hands = mp.solutions.hands(model.get_weights())
seconds = 1
fs = 44100
cap = cv2.VideoCapture(0)
with hands.Hands() as hands:
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            print("Ignoring empty camera frame.")
            # If loading a video, use 'break' instead of 'continue'.
            continue

        # To improve performance, optionally mark the image as not writeable to
        # pass by reference.
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = hands.process(image)

        # Draw the hand annotations on the image.
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                drawing.draw_landmarks(
                image,
                hand_landmarks,
                hands.HAND_CONNECTIONS,
                styles.get_default_hand_landmarks_style(),
                styles.get_default_hand_connections_style())
                frequency1 = (hand_landmarks.landmark[0].x**2) *2000
                frequency2 = (hand_landmarks.landmark[0].x**2) *4000
                frequency3 = (hand_landmarks.landmark[0].y**2) *2000
                frequency4 = (hand_landmarks.landmark[0].y**2) *4000
                t = np.linspace(0, seconds, seconds * fs, False)
                note1 = np.sin(frequency1 * t * np.pi)
                note2 = np.cos(frequency2 * t * np.pi)
                note3 = np.sin(frequency3 * t * np.pi)
                note4 = np.cos(frequency4 * t * np.pi)
                note = note1 + note2 + note3 + note4 
                audio = note * (2**15 - 1) / np.max(np.abs(note))
                audio = audio.astype(np.int16)
                play_obj = sa.play_buffer(audio, 1, 2, fs)
                
        # Flip the image horizontally for a selfie-view display.
        cv2_imshow('Live Testing', cv2.flip(image, 1))
        if cv2.waitKey(5) & 0xFF == ord('q'):
            play_obj.wait_done()
            break
cap.release()