<a href="https://colab.research.google.com/github/talhaahussain/grappling-pose-identification/blob/main/src/Pose_Estimation_for_Grappling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### // **Briefing**

This is a notebook for identifying grappling positions, through computer vision and binary classification.

This does away with any real-time capture and playback, instead opting for more reliable (and less system-intensive) persistent data retrieval and storage. Obtaining images and video, and loading them into the environment is **your** responsibility.

### // **Setup**

Please run the following to install a required library for computer vision.

In [None]:
%%capture
!pip install super-gradients

**After running the above, please restart your runtime environment.**


This must be done in order to avoid an issue with PIL, where `Image.open()` (used by the model to load images) fails and instead raises exception `PIL.UnidentifiedImageError`.

You can restart your runtime environment by going to the "Runtime" section overhead, and clicking "Restart Session".

While the CSV dataset has been provided, the corresponding images have not. To obtain them, go to https://vicos.si/resources/jiujitsu/

### // **Imports**

Please run the following cell in order to import all necessary libraries and modules.

In [None]:
import os
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import pathlib
import re
from imutils import paths
import numpy as np
import cv2
from super_gradients.training import models
from super_gradients.common.object_names import Models
import json
import pandas as pd
import numpy as np
import ast
from sklearn.preprocessing import normalize
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split
from sklearn import metrics
import matplotlib.pyplot as plt

The console stream is logged into /root/sg_logs/console.log


[2024-05-01 09:35:14] INFO - crash_tips_setup.py - Crash tips is enabled. You can set your environment variable to CRASH_HANDLER=FALSE to disable it
[2024-05-01 09:35:18] INFO - utils.py - NumExpr defaulting to 2 threads.


### // **Load Device**

Please run the following to allow your device of choice (CPU or GPU) to be used.

To enable GPU runtime, click "Connect" in the top right hand corner, and choose "Change Runtime Type".

In [None]:
# If GPU available, use it
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

### // **Setting up Classifier**

Run all of the following cells in order to set up the MLP class and associated functions.

In [None]:
def init_data(file="pin_dataset.csv", test_size=0.20, seed=50):
  X = []
  y = []
  df = pd.read_csv(file)

  y = df["pin"].to_numpy()
  pose1 = df["pose1"].to_numpy()
  pose2 = df["pose2"].to_numpy()

  # Populate features list with contents of dataframes
  for i in range(len(pose1)):
    X.append((np.array(ast.literal_eval(pose1[i]) + ast.literal_eval(pose2[i]))).flatten())

  X = np.array(X)

  # Apply min-max normalization to features
  X = normalize(X, axis=0, norm='max')

  # Split into train and test (holdout) datasets
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=seed)

  # Perform random oversampling on both train and test sets
  oversample = RandomOverSampler(sampling_strategy='minority')
  X_train, y_train = oversample.fit_resample(X_train, y_train)
  X_test, y_test = oversample.fit_resample(X_test, y_test)

  return X_train, X_test, y_train, y_test

In [None]:
class MLP(nn.Module):
  def __init__(self, n_features=3*17*2, n_hidden=34, n_classes=1):
    super().__init__()
    self.fc1 = nn.Linear(n_features, n_hidden)
    self.fc2 = nn.Linear(n_hidden, n_hidden)
    self.fc3 = nn.Linear(n_hidden, n_classes)

  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = F.sigmoid(self.fc3(x))

    return x

In [None]:
def train_model(X_train, y_train, mlp, epochs=100, seed=50):
  torch.manual_seed(seed)

  X_train = torch.FloatTensor(X_train)
  y_train = torch.LongTensor(y_train)

  # Use Binary Cross-Entropy as loss function
  criterion = nn.BCELoss()

  # Use Adam as optimizer
  optimizer = torch.optim.Adam(mlp.parameters(), lr=0.01)
  losses = []
  for i in range(epochs):
    # Make prediction
    y_pred = mlp.forward(X_train).squeeze(-1)

    # Evaluate loss
    loss = criterion(y_pred, y_train.float())
    losses.append(loss.detach().numpy())

    if i % 10 == 0:
      print(f"Epoch: {i}, loss: {loss}")

    # Backpropogate
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  return epochs, losses

In [None]:
def test_model(X_test, y_test, mlp):
  X_test = torch.FloatTensor(X_test)
  y_test = torch.LongTensor(y_test)

  predicted = []
  predicted_class = []
  actual_class = []
  correct = 0

  # Turn off backpropogation training
  with torch.no_grad():
    for i, data in enumerate(X_test):
      y_eval = mlp.forward(data)

      #print(f"{i+1}.) {str(y_eval)} \t {y_test[i]}")

      if round(y_eval.item()) == y_test[i]:
        correct += 1

      predicted.append(y_eval.item())
      predicted_class.append(round(y_eval.item()))
      actual_class.append(y_test[i])

  return correct, i, predicted, predicted_class, actual_class


In [None]:
def save_model(model, filename="mlp.pt"):
  torch.save(model.state_dict(), filename)
  print(f"Saved model to {filename}.")


In [None]:
def load_model(filename="mlp.pt"):
  mlp = MLP()
  mlp.load_state_dict(torch.load(filename))
  print(f"Loaded model from {filename}.")
  return mlp

Run the following cell to create a MLP model, train it, test it, get its results and save it!

In [None]:
### USE THIS CELL TO EXECUTE FUNCTIONS FOR THE CLASSIFIER!

X_train, X_test, y_train, y_test = init_data()
mlp = MLP()
epochs, losses = train_model(X_train, y_train, mlp, epochs=1000)

plt.plot(range(epochs), losses)
plt.ylabel("Loss")
plt.xlabel("Epoch")

correct, total, predicted, predicted_class, actual_class = test_model(X_test, y_test, mlp)
print(f"Correct: {correct} out of {total} on unseen data.")
confusion_matrix = metrics.confusion_matrix(actual_class, predicted_class)

cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = [0, 1])

cm_display.plot()
plt.show()

save_model(mlp)

### // **Keypoint Detection**

In [None]:
def predict_position(mlp, poses):
  # if (people in image) != 2...
  if len(poses) != 102:
    return 0.0
  poses = torch.FloatTensor(poses)
  with torch.no_grad():
    pred = mlp(poses)

  return pred.item()


In [None]:
def init_HPE_model(device):
  # Initialise machine learning model
  yolo_nas_pose = models.get("yolo_nas_pose_l", pretrained_weights="coco_pose").to(device)
  return yolo_nas_pose

In [None]:
def predict_image(HPE_model, input_path, confidence):
  result = HPE_model.predict(input_path, conf=confidence)
  return result

In [None]:
def extract_poses(result):
  poses = result.prediction.poses
  landmarks = np.array([])
  for person, pose in enumerate(poses):
    landmarks = np.concatenate((landmarks, pose.flatten()))

  landmarks.flatten()
  return landmarks


In [None]:
def draw_keypoints(result, color):
  keypoint_colors = color * 17
  edge_colors = color * 19

  image = result.draw(
            edge_colors=edge_colors,
            joint_thickness=5,
            keypoint_colors=keypoint_colors,
            keypoint_radius=10,
            box_thickness=5,
            show_confidence=True,
        )
  return image

In [None]:
def annotate_image(image, text, confidence, color=(255, 0, 0)):
  font = cv2.FONT_HERSHEY_DUPLEX
  org1 = (25, 50)
  org2 = (25, 100)
  fontScale = 1
  thickness = 3
  image = cv2.putText(image, text, org1, font, fontScale,
                 color, thickness, cv2.LINE_AA, False)
  image = cv2.putText(image, str(confidence), org2, font, fontScale,
                 color, thickness, cv2.LINE_AA, False)

  return image

In [None]:
def save_image(filename, image):
  cv2.imwrite(filename, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))

In [None]:
def split_video_into_frames(filename):
  frames = []
  cap = cv2.VideoCapture(filename)
  fps = cap.get(cv2.CAP_PROP_FPS)

  success = 1
  while success:
    success, frame = cap.read()
    if success:
      frames.append(frame)

  cap.release()
  return frames, fps

In [None]:
def concat_frames_into_video(filename, frames, fps):
  height, width, channels = frames[0].shape

  fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  out = cv2.VideoWriter(filename, fourcc, fps, (width, height))

  for frame in frames:
    out.write(frame)

  out.release()

In [None]:
def prediction_text_color(prediction):
  if prediction > 0.5:
    color = (0, 255, 0)
    text = "**PIN-DETECTED**"
  elif prediction < 0.5:
    color = (255, 0, 0)
    text = "**NO-PIN-DETECTED**"
  return color, text

In [None]:
def predict_on_video(input_filename, output_filename, mlp, confidence):
  yolo_nas_pose = init_HPE_model(device)
  frames, fps = split_video_into_frames(input_filename)
  final_frames = []
  for frame in frames:
    result = predict_image(yolo_nas_pose, frame, confidence)
    landmarks = extract_poses(result)
    prediction = predict_position(mlp, landmarks)
    color, text = prediction_text_color(prediction)
    color = color[::-1] # Reverse color to correct for BGR-RGB
    image = draw_keypoints(result, [color])
    image = annotate_image(image, text=text, confidence=prediction, color=color)
    final_frames.append(image)

  concat_frames_into_video(output_filename, final_frames, fps)


In [None]:
def predict_on_image(input_filename, output_filename, mlp, confidence):
  yolo_nas_pose = init_HPE_model(device)
  result = predict_image(yolo_nas_pose, input_filename, confidence)
  landmarks = extract_poses(result)
  prediction = predict_position(mlp, landmarks)
  color, text = prediction_text_color(prediction)
  image = draw_keypoints(result, [color])
  image = annotate_image(image, text=text, confidence=prediction, color=color)
  save_image(output_filename, image)

The following cell is for user interaction!

In [None]:
### Modify these variables!
input_filename = "example3.mp4"
output_filename = "example3_out.mp4"

mlp = load_model()
confidence = 0.3

predict_on_video(input_filename, output_filename, mlp, confidence)

 It is your responsibility to determine whether you have permission to use the models for your use case.
 The model you have requested was pre-trained on the coco_pose dataset, published under the following terms: https://cocodataset.org/#termsofuse
[2024-05-01 09:44:40] INFO - checkpoint_utils.py - License Notification: YOLO-NAS-POSE pre-trained weights are subjected to the specific license terms and conditions detailed in 
https://github.com/Deci-AI/super-gradients/blob/master/LICENSE.YOLONAS-POSE.md
By downloading the pre-trained weight files you agree to comply with these terms.
[2024-05-01 09:44:40] INFO - checkpoint_utils.py - Successfully loaded pretrained weights for architecture yolo_nas_pose_l
[2024-05-01 09:44:40] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`
