### Importing Libraries

In [25]:
#loading libraries
import numpy as np
import matplotlib.pyplot as plt
import random, string, argparse, os
from pathlib import Path
import pandas as pd
import pickle
import math
import warnings
warnings.filterwarnings('error')
from sklearn.exceptions import InconsistentVersionWarning
import sklearn

import torch
import torchvision.transforms.functional as F
from torchvision.io.video import read_video
from torchvision.utils import draw_keypoints
from torchvision.models.detection import keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights
from torchvision.utils import make_grid
from torchvision.utils import save_image
from torchvision.io import read_image
from itertools import combinations

### Defining important constants and functions


In [4]:
nan_values = torch.tensor([float('nan'), float('nan'), float('nan')])

coco_keypoints = [
    "nose", "left_eye", "right_eye", "left_ear", "right_ear",
    "left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
    "left_wrist", "right_wrist", "left_hip", "right_hip",
    "left_knee", "right_knee", "left_ankle", "right_ankle",
]

cols = []
for kp in coco_keypoints:
    cols.append(kp + "_x")
    cols.append(kp + "_y")

keypoints_of_interest = [
     "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist"
]

connections_of_interest = {
    "left_upper_arm" : ["left_shoulder", "left_elbow"],
    "left_lower_arm" : ["left_elbow", "left_wrist"],
    "right_upper_arm" : ["right_shoulder", "right_elbow"],
    "right_lower_arm" : ["right_elbow", "right_wrist"]
}

data_attr_of_interest = [
    kp + "_" + coord
    for kp in keypoints_of_interest
    for coord in ["x","y"]
]

connection_combinations = list(combinations(connections_of_interest.keys(), 2))

def dot(vA, vB):
    return vA[0]*vB[0]+vA[1]*vB[1]

def make_short_form(name):
  return "".join([word[0].upper() for word in name.split("_")])

def to_ij_vector(coordinates):
  return [(round(coordinates[1][0]-coordinates[0][0],4)), (round(coordinates[1][1]-coordinates[0][1],2))]

def ang(vA, vB):
    try:
      # Get dot prod
      dot_prod = dot(vA, vB)
      # Get magnitudes
      magA = dot(vA, vA)**0.5
      magB = dot(vB, vB)**0.5
      # Get cosine value
      cos_ = dot_prod/magA/magB
      # Get angle in radians and then convert to degrees
      angle = math.acos(dot_prod/magB/magA)
      # Basically doing angle <- angle mod 360
      ang_deg = math.degrees(angle)%360

      if ang_deg-180>=0:
          # As in if statement
          return 360 - ang_deg
      else:

          return ang_deg
    except Exception as e:
      print(e, ":", vA, vB)
      return 0

# Function to spit out features given frames and keypoints
def get_features(df):
  data = df[data_attr_of_interest].dropna()

  for val in keypoints_of_interest:
    data[val] = list(zip(data[f"{val}_x"], data[f"{val}_y"]))

  for connection, (keypoint1, keypoint2) in connections_of_interest.items():
    data[connection] = list(zip(data[keypoint1], data[keypoint2]))

  # Converting to i j vectors
  for connection in connections_of_interest:
    data[connection] = data[connection].apply(to_ij_vector)

  # Connection vector angles
  rest_pos_vector = [0,-1] # downward, -j unit vector

  for connection in connections_of_interest:
    data[connection + "_angle"] = data[connection].apply(ang, vB=rest_pos_vector)

  # Feature - Ratio of angles lower angle:higher angle of anlge between left upper and lower arm and for right
  ratios = []

  for idx, row in data.iterrows():
    left_arm_angle = ang(row["left_upper_arm"], row["left_lower_arm"])
    right_arm_angle = ang(row["right_upper_arm"], row["right_lower_arm"])
    ratios.append(round(min([left_arm_angle, right_arm_angle])*100/max([left_arm_angle, right_arm_angle]),4))

  data["arm_angle_ratio"] = ratios

  # Ratio of y of wrists positions
  ratios = []

  for idx, row in data.iterrows():
    min_y = min([row["left_wrist_y"], row["right_wrist_y"]])
    max_y = max([row["left_wrist_y"], row["right_wrist_y"]])
    ratios.append(round(min_y*100/max_y,4))

  data["wrist_y_ratio"] = ratios

  # Difference of angle between each connection combination
  for connection1, connection2 in connection_combinations:
    data[f"{make_short_form(connection1)}_{make_short_form(connection2)}"] = (data[connection1 + "_angle"] - data[connection2 + "_angle"]).abs()

  # Ratio of y of wrist vs shoulder positions
  ratios = []

  for idx, row in data.iterrows():
    left = abs(row["left_wrist_y"] - row["left_shoulder_y"])
    right = abs(row["right_wrist_y"] - row["right_shoulder_y"])
    ratios.append(round(min([left,right])*100/max([left,right]),4))

  data["wrist_shoulder_y_ratio"] = ratios

  # Start summarizing
  starting_col_idx = data.columns.tolist().index("left_upper_arm_angle")

  features_video = {}

  for col in data.columns[starting_col_idx:]:
    features_video[f"{col}_min"] = data[col].min().tolist()
    features_video[f"{col}_max"] = data[col].max().tolist()
    features_video[f"{col}_mean"] = data[col].mean().tolist()
    features_video[f"{col}_median"] = data[col].median().tolist()
    features_video[f"{col}_var"] = data[col].var().tolist()

  return pd.Series(features_video).sort_index()

### Loading the Keypoint Detection model

In [5]:
#Loading the model
print("===> Loading the model")
weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT
transforms = weights.transforms()

model = keypointrcnn_resnet50_fpn(weights=weights, progress=False)#.to(device)
model = model.eval()
print("===> The keypoint detector model loaded!")

===> Loading the model
===> The keypoint detector model loaded!


### Loading the video

***** Please provide the path to the video *****

In [6]:
path_to_video = "/Users/amankumar/Work/ECE_699/dataset_new/train/hand_waving/Wave_20231211_part_1.mp4"

####### You can also change the interval between the extracted frame ######
step_between_frames = 3
###########################################################################

In [7]:
print("===> Loading the video")
frames, t1, t2 = read_video(path_to_video, output_format="TCHW")
print("Video metadata: ", t2)
print("Total number of frames extracted: ", len(frames))
# Fetching selected frames
#gesture_int = [frames[x].to(device) for x in range(0,len(frames),step_between_frames)]
gesture_int = [frames[x] for x in range(0,len(frames), step_between_frames)]
print("Total number of frames selected: ", len(gesture_int))

===> Loading the video




Video metadata:  {'video_fps': 30.0, 'audio_fps': 44100}
Total number of frames extracted:  150
Total number of frames selected:  50


### Detecting keypoints and saving the keypoints into a dataframe

In [8]:
detect_threshold = 0.75
print("===> Preprocessing the frames for the model")
# gestures_float = [transforms(img.to(device)) for img in gesture_int]
gestures_float = [transforms(img) for img in gesture_int]

# Detecting the keypoints in the frames
print("===> Processing frames to detect keypoints")
#with torch.no_grad():
outputs = []
for gesture in gestures_float:
    outputs.append(model([gesture])[0])
#outputs = model(gestures_float)
print("===> The video is processed")

# Keeping the keypoints of those objects from the video whose confidence score was above threshold
for i in range(len(outputs)):
    idx = torch.where(outputs[i]['scores'] > detect_threshold)
    #if the keypoint is not available, then making it nan
    if len(idx[0]) == 0:
        print("====> No person detected. Skip and Go ahead! <====")
        outputs[i]['keypoints'][0] = nan_values
    else:
        outputs[i]['keypoints'] = outputs[i]['keypoints'][idx]
        outputs[i]['keypoints_scores'] = outputs[i]['keypoints_scores'][idx]
        # Also, filling nan values for keypoints whose confidence score is in negative (=> couldn't find that joint)
        outputs[i]['keypoints'][0][torch.where(outputs[i]['keypoints_scores'][0] < 0)] = nan_values

# formatting the data so that it can be saved as DataFrame csv
data_csv = [outputs[idx]['keypoints'][0][:,0:2].detach().numpy().flatten() for idx in range(0,len(outputs))]

keypoints = pd.DataFrame(data=data_csv, columns=cols)

===> Preprocessing the frames for the model
===> Processing frames to detect keypoints
===> The video is processed


### Performing feature engineering on the keypoints

In [12]:
test_x = get_features(keypoints)

### Loading the machine learning model

***** Please provide the path to the model *****

In [28]:
# Location of model
model_file_path = "/Users/amankumar/Work/ECE_699/gesture-recognition/trained_models/feature_engineering_model.sklearn-1-2-2.pickle"

# Load the saved model pipeline
try:
  with open(model_file_path, 'rb') as pickle_file:
    loaded_pipeline = pickle.load(pickle_file)
except InconsistentVersionWarning:
  print("The model was pickled in different scikit-learn version than your current scikit-learn version of {}. \nDue to difference in versions, there might be some inconsistencies while unpickling the model. \nTo make sure that the model behaves as expected, try using the compatible version scikit-learn version mentioned in requirements.txt".format(sklearn.__version__))

The model was pickled in different scikit-learn version than your current scikit-learn version of 1.3.2. 
Due to difference in versions, there might be some inconsistencies while unpickling the model. 
To make sure that the model behaves as expected, try using the compatible version scikit-learn version mentioned in requirements.txt


### Getting the prediction

In [15]:
# Predict for the test set
prediction = loaded_pipeline.predict([test_x])

In [16]:
print("Predicted Gesture: ", prediction)

Predicted Gesture:  ['hand_waving']
