## Detect Yoga Poses: Encoder using MoveNet

**[MoveNet](https://t.co/QpfnVL0YYI?amp=1)** is an ultra fast and accurate model that detects 17 keypoints of a body. The model is offered on [TF Hub](https://tfhub.dev/s?q=movenet) with two variants, known as Lightning and Thunder. Lightning is intended for latency-critical applications, while Thunder is intended for applications that require high accuracy. Both models run faster than real time (30+ FPS) on most modern desktops, laptops, and phones, which proves crucial for live fitness, health, and wellness applications.


<img src="https://github.com/robotengineer123/YogaPoseClassifier/blob/main/StandingSplits/Standing_Splits_100.jpg?raw=true" alt="drawing"/>

This Colab walks you through the details of how to load MoveNet, and run inference on the input image and video below.

Note: check out the [live demo](https://storage.googleapis.com/tfjs-models/demos/pose-detection/index.html?model=movenet) for how the model works!

Next step is visualization

In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import keras
import cv2

# Import matplotlib libraries
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import matplotlib.patches as patches
import pandas as pd
import os

# Some modules to display an animation using imageio.
from IPython.display import HTML, display

## Helper functions

In [2]:
model_name = "movenet_lightning" 

if "tflite" in model_name:
  if "movenet_lightning_f16" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/lightning/tflite/float16/4?lite-format=tflite
    input_size = 192
  elif "movenet_thunder_f16" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite
    input_size = 256
  elif "movenet_lightning_int8" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/lightning/tflite/int8/4?lite-format=tflite
    input_size = 192
  elif "movenet_thunder_int8" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/int8/4?lite-format=tflite
    input_size = 256
  else:
    raise ValueError("Unsupported model name: %s" % model_name)

  # Initialize the TFLite interpreter
  interpreter = tf.lite.Interpreter(model_path="model.tflite")
  interpreter.allocate_tensors()

  def movenet(input_image):
    """Runs detection on an input image.

    Args:
      input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
      A [1, 1, 17, 3] float numpy array representing the predicted keypoint
      coordinates and scores.
    """
    # TF Lite format expects tensor type of uint8.
    input_image = tf.cast(input_image, dtype=tf.uint8)
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.set_tensor(input_details[0]['index'], input_image.numpy())
    # Invoke inference.
    interpreter.invoke()
    # Get the model prediction.
    keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])
    return keypoints_with_scores

else:
  if "movenet_lightning" in model_name:
    module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
    input_size = 192
  elif "movenet_thunder" in model_name:
    module = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
    input_size = 256
  else:
    raise ValueError("Unsupported model name: %s" % model_name)

  def movenet(input_image):
    """Runs detection on an input image.

    Args:
      input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
      A [1, 1, 17, 3] float numpy array representing the predicted keypoint
      coordinates and scores.
    """
    model = module.signatures['serving_default']
    #print("model: ", model)

    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    #print("outputs: ", outputs)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    #print("Keypoints: ", keypoints_with_scores)
    return keypoints_with_scores

## Test encoder on own dataset

Get data from github

In [3]:
def remove_corrupt():
    dpath = "dataset"
    for folder in os.listdir(dpath):
        for f in os.listdir(dpath+"/"+folder):
            image_path = dpath+"/"+folder+"/"+f
            try: 
                image = tf.io.read_file(image_path)
                image = tf.image.decode_jpeg(image, channels=3)
            except:
                os.remove(image_path)
                print("removed file: "+image_path)
# remove_corrupt()

In [4]:
train_ds = tf.keras.utils.image_dataset_from_directory(
  "dataset",
  validation_split=0.8,
  subset="training",
  seed=123,
  batch_size=1,
  label_mode='categorical')
val_ds = tf.keras.utils.image_dataset_from_directory(
  "dataset",
  validation_split=0.2,
  subset="validation",
  seed=123,
  batch_size=1,
  label_mode='categorical')
data_augmentation = tf.keras.Sequential([
        tf.keras.layers.RandomFlip(mode='horizontal'), # Horizontally flip the given PIL Image randomly with a given probability. 
        tf.keras.layers.RandomContrast(0.1), # Randomly change the brightness, contrast and saturation of an image
        tf.keras.layers.RandomRotation(0.1), # Rotate the image by angle.
        tf.keras.layers.Rescaling(input_size, input_size)
        ])
class_names =train_ds.class_names
train_ds = train_ds.map(lambda x, y: (data_augmentation(x), y))

Found 2382 files belonging to 9 classes.
Using 477 files for training.
Found 2382 files belonging to 9 classes.
Using 476 files for validation.
























































Generate dataset

In [5]:
def GenerateFeatureDS(ds):
  keypoints_with_scores = []
  y = []

  for i, (image, true_class) in zip(range(len(ds)), ds.as_numpy_iterator()): 
    # Resize and pad the image to keep the aspect ratio and fit the expected size.
    input_image = tf.image.resize_with_pad(image, input_size, input_size)

    # Run model inference.
    keypoints_with_scores.append(movenet(input_image).flatten())
    y.append(true_class.flatten())
  return tf.data.Dataset.from_tensor_slices((keypoints_with_scores, y))

train_ds = GenerateFeatureDS(train_ds)
val_ds = GenerateFeatureDS(val_ds)

## Classification

In [6]:
def get_center_point(landmarks, left_bodypart, right_bodypart):
  """Calculates the center point of the two given landmarks."""

  left = tf.gather(landmarks, left_bodypart, axis=1)
  right = tf.gather(landmarks, right_bodypart, axis=1)
  center = left * 0.5 + right * 0.5
  return center


def get_pose_size(landmarks, torso_size_multiplier=2.5):
  """Calculates pose size.

  It is the maximum of two values:
    * Torso size multiplied by `torso_size_multiplier`
    * Maximum distance from pose center to any pose landmark
  """
  # Hips center
  hips_center = get_center_point(landmarks, 11, 
                                 12)

  # Shoulders center
  shoulders_center = get_center_point(landmarks, 5,
                                      6)

  # Torso size as the minimum body size
  torso_size = tf.linalg.norm(shoulders_center - hips_center)

  # Pose center
  pose_center_new = get_center_point(landmarks, 11, 
                                     12)
  pose_center_new = tf.expand_dims(pose_center_new, axis=1)
  # Broadcast the pose center to the same size as the landmark vector to
  # perform substraction
  pose_center_new = tf.broadcast_to(pose_center_new,
                                    [tf.size(landmarks) // (17*2), 17, 2])

  # Dist to pose center
  d = tf.gather(landmarks - pose_center_new, 0, axis=0,
                name="dist_to_pose_center")
  # Max dist to pose center
  max_dist = tf.reduce_max(tf.linalg.norm(d, axis=0))

  # Normalize scale
  pose_size = tf.maximum(torso_size * torso_size_multiplier, max_dist)

  return pose_size


def normalize_pose_landmarks(landmarks):
  """Normalizes the landmarks translation by moving the pose center to (0,0) and
  scaling it to a constant pose size.
  """
  # Move landmarks so that the pose center becomes (0,0)
  pose_center = get_center_point(landmarks, 11, 
                                 12)
  pose_center = tf.expand_dims(pose_center, axis=1)
  # Broadcast the pose center to the same size as the landmark vector to perform
  # substraction
  pose_center = tf.broadcast_to(pose_center, 
                                [tf.size(landmarks) // (17*2), 17, 2])
  landmarks = landmarks - pose_center

  # Scale the landmarks to a constant pose size
  pose_size = get_pose_size(landmarks)
  landmarks /= pose_size

  return landmarks


def landmarks_to_embedding(landmarks_and_scores):
  """Converts the input landmarks into a pose embedding."""
  # Reshape the flat input into a matrix with shape=(17, 3)
  reshaped_inputs = keras.layers.Reshape((17, 3))(landmarks_and_scores)

  # Normalize landmarks 2D
  landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])

  # Flatten the normalized landmark coordinates into a vector
  embedding = keras.layers.Flatten()(landmarks)

  return embedding

In [7]:
def landmarks_to_embedding(landmarks_and_scores):
  """Converts the input landmarks into a pose embedding."""
  # Reshape the flat input into a matrix with shape=(17, 3)
  reshaped_inputs = keras.layers.Reshape((17, 3))(landmarks_and_scores)

  # Normalize landmarks 2D
  landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])

  # Flatten the normalized landmark coordinates into a vector
  embedding = keras.layers.Flatten()(reshaped_inputs)

  return embedding

## Basic model

In [8]:
# Define the model 
def basic_Model():
  inputs = tf.keras.Input(shape=(51))
  embedding = landmarks_to_embedding(inputs)

  layer = keras.layers.Dense(128, activation=tf.nn.relu6)(inputs)
  layer = keras.layers.Dense(64, activation=tf.nn.relu6)(layer)
  outputs = keras.layers.Dense(len(class_names), activation="softmax")(layer)

  model = keras.Model(inputs, outputs)
  model.compile(
      optimizer='adam',
      loss='categorical_crossentropy',
      metrics=['accuracy']
  )
  return model
#model.summary()

## basic model with dropout

In [9]:
# Define the model 
def DO_Model(DOfrac):
  inputs = tf.keras.Input(shape=(51))
  embedding = landmarks_to_embedding(inputs)

  layer = keras.layers.Dense(128, activation=tf.nn.relu6)(embedding)
  layer = keras.layers.Dropout(DOfrac)(layer)
  layer = keras.layers.Dense(64, activation=tf.nn.relu6)(layer)
  layer = keras.layers.Dropout(DOfrac)(layer)
  outputs = keras.layers.Dense(len(class_names), activation="softmax")(layer)

  model = keras.Model(inputs, outputs)
  model.compile(
      optimizer='adam',
      loss='categorical_crossentropy',
      metrics=['accuracy']
  )
  return model
#model.summary()

## Basic model with batch normalization

In [10]:
def BN_Model():
  inputs = tf.keras.Input(shape=(51))
  embedding = landmarks_to_embedding(inputs)

  layer = keras.layers.Dense(128)(embedding)
  layer = tf.keras.layers.BatchNormalization()(layer)
  layer = keras.layers.Activation(activation=tf.nn.relu6)(layer)
  layer = keras.layers.Dense(64)(layer)
  layer = tf.keras.layers.BatchNormalization()(layer)
  layer = keras.layers.Activation(activation=tf.nn.relu6)(layer)
  layer = keras.layers.Dense(len(class_names))(layer)
  layer = tf.keras.layers.BatchNormalization()(layer)
  outputs = keras.layers.Activation(activation="softmax")(layer)


  model = keras.Model(inputs, outputs)

  model.compile(
      optimizer='adam',
      loss='categorical_crossentropy',
      metrics=['accuracy']
  )
  return model


In [12]:
# Visualize the training history to see whether you're overfitting.
def plot_models(histories, model_names):

  fig, axs = plt.subplots(1,2, figsize=(24,8))
  axs[0].set_title('Model accuracy')
  axs[0].set_ylabel('accuracy')
  axs[0].set_xlabel('epoch')
  axs[1].set_title('Model loss')
  axs[1].set_ylabel('loss')
  axs[1].set_xlabel('epoch')
  for history, name in zip(histories,model_names):

    #axs[0].plot(history.history['accuracy'], label='TRAIN '+name)
    axs[0].plot(history.history['val_accuracy'], label='VAL '+name)
    axs[0].legend(loc='lower right')

    #axs[1].plot(history.history['loss'], label='TRAIN '+name)
    axs[1].plot(history.history['val_loss'], label='VAL '+name)
    axs[1].legend(loc='upper right')

  plt.show()

def train(model):
  # Add a checkpoint callback to store the checkpoint that has the highest
  # validation accuracy.
  '''   checkpoint_path = "weights.best.hdf5"
    checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path,
                                monitor='val_accuracy',
                                verbose=1,
                                save_best_only=True,
                                mode='max') '''
  earlystopping = keras.callbacks.EarlyStopping(monitor='val_accuracy', 
                                                patience=20)

  # Start training
  history = model.fit(train_ds.batch(16),
                      epochs=200,
                      validation_data=val_ds.batch(16),
                      verbose=0,
                      callbacks=[earlystopping])
  return history

In [13]:
bn_model = BN_Model()
do_models = [DO_Model(0.1), DO_Model(0.3), DO_Model(0.5)]
do_labels = ["Dropout=0.1", "Dropout=0.3", "Dropout=0.5"]
basic_model = basic_Model()

In [14]:
models = {"models": [basic_model, bn_model, *do_models],
          "labels": ["Basic", "BatchNormalization", *do_labels]}

In [15]:
bn_hist = train(bn_model)
do_hists = [train(x) for x in do_models]
basic_hist = train(basic_model)

































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































In [16]:
plot_models((bn_hist, basic_hist), ("BatchNormalization", "basic"))

: 

: 

In [None]:
plot_models((*do_hists, basic_hist), (*do_labels, "Basic"))

### Test data accuracy

In [None]:
loss = []
accuracy = []
for model in models['models']:  
  l, a = model.evaluate(test_x, test_y, verbose=0)
  loss.append(round(l,5))
  accuracy.append(round(a,5))

pd.DataFrame({"Loss": loss, "Accuracy": accuracy}, models["labels"])

## Visualization

In [None]:
# Visualize the training history to see whether you're overfitting.
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')
plt.show()

In [None]:
# Evaluate the model using the TEST dataset
loss, accuracy = model.evaluate(test_x, test_y)

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
  """Plots the confusion matrix."""
  if normalize:
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    print("Normalized confusion matrix")
  else:
    print('Confusion matrix, without normalization')

  plt.imshow(cm, interpolation='nearest', cmap=cmap)
  plt.title(title)
  plt.colorbar()
  tick_marks = np.arange(len(classes))
  plt.xticks(tick_marks, classes, rotation=55)
  plt.yticks(tick_marks, classes)
  fmt = '.2f' if normalize else 'd'
  thresh = cm.max() / 2.
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, format(cm[i, j], fmt),
              horizontalalignment="center",
              color="white" if cm[i, j] > thresh else "black")

  plt.ylabel('True label')
  plt.xlabel('Predicted label')
  plt.tight_layout()

# Classify pose in the TEST dataset using the trained model
y_pred = model.predict(test_x)

# Convert the prediction result to class name
y_pred_label = [class_names[i] for i in np.argmax(y_pred, axis=1)]
y_true_label = [class_names[i] for i in np.argmax(test_y, axis=1)]
# Plot the confusion matrix
cm = confusion_matrix(np.argmax(test_y, axis=1), np.argmax(y_pred, axis=1))
plot_confusion_matrix(cm,
                      class_names,
                      title ='Confusion Matrix of Pose Classification Model')

# Print the classification report
print('\nClassification Report:\n', classification_report(y_true_label,
                                                          y_pred_label))

In [None]:
IMAGE_PER_ROW = 3
MAX_NO_OF_IMAGE_TO_PLOT = 10

# Extract the list of incorrectly predicted poses
false_predict = [id_in_df for id_in_df in range(len(test_y)) \
                if y_pred_label[id_in_df] != y_true_label[id_in_df]]
predicted = [id_in_df for id_in_df in range(len(test_y)) \
                if y_pred_label[id_in_df] == y_true_label[id_in_df]]
print("False pred: ", len(false_predict))
print("Pred: ", len(predicted))
if len(false_predict) > MAX_NO_OF_IMAGE_TO_PLOT:
  false_predict = false_predict[:MAX_NO_OF_IMAGE_TO_PLOT]

# Plot the incorrectly predicted images
row_count = len(false_predict) // IMAGE_PER_ROW + 1
fig = plt.figure(figsize=(10 * IMAGE_PER_ROW, 10 * row_count))
for i, id_in_df in enumerate(false_predict):
  ax = fig.add_subplot(row_count, IMAGE_PER_ROW, i + 1)
  image_path = os.path.join('/boot/YogaPoseClassifier-main/dataset/Test',
                            test_df.iloc[id_in_df]['file'])

  image = cv2.imread(image_path)
  plt.title("Predict: %s; Actual: %s"
            % (y_pred_label[id_in_df], y_true_label[id_in_df]))
  plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()