In [2]:
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm

from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [3]:
import os
import cv2

def video_to_images(video_path, output_folder, frame_rate=2):
    """
    Convert a video file into a sequence of images.
    Args:
    - video_path (str): Path to the input video file.
    - output_folder (str): Path to the output folder where images will be saved.
    - frame_rate (int): Rate of frames to be extracted per second (default is 2).
    Returns:
    - int: Number of images extracted.
    """
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Get video name without extension
    video_name = os.path.splitext(os.path.basename(video_path))[0]

    # Open the video file
    video_capture = cv2.VideoCapture(video_path)

    # Get total number of frames in the video
    total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

    # Set the frame rate
    frame_rate_to_use = frame_rate  # The desired frame rate to use
    frame_interval = int(video_capture.get(cv2.CAP_PROP_FPS)) // frame_rate_to_use

    # Read and save frames
    current_frame = 0
    images_extracted = 0  # Counter for the number of images extracted
    while True:
        # Read the next frame
        ret, frame = video_capture.read()

        if not ret:
            break

        # Save frame only if it satisfies the frame_interval condition
        if current_frame % frame_interval == 0:
            frame_filename = f"{video_name}_frame_{current_frame:04d}.jpg"  # Padding frame number with zeros
            image_path = os.path.join(output_folder, frame_filename)
            cv2.imwrite(image_path, frame)
            images_extracted += 1

        current_frame += 1

    # Release the video capture object
    video_capture.release()

    return images_extracted  # Return the count of images extracted

# Function to process multiple videos in a folder
def process_videos_in_folder(folder_path, output_folder, frame_rate=2):
    """
    Process all videos in a folder and convert them into sequences of images.
    Args:
    - folder_path (str): Path to the folder containing input video files.
    - output_folder (str): Path to the output folder where images will be saved.
    - frame_rate (int): Rate of frames to be extracted per second (default is 2).
    Returns:
    - None
    """
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    total_images_extracted = 0  # Counter for the total number of images extracted

    # Iterate over all files in the input folder
    for filename in sorted(os.listdir(folder_path)):
        if filename.endswith((".avi", ".mp4", ".MOV")):  # Add more video formats if needed
            video_path = os.path.join(folder_path, filename)
            images_extracted = video_to_images(video_path, output_folder, frame_rate)
            total_images_extracted += images_extracted

    print("Total images extracted:", total_images_extracted)

# Example usage:
videos_folder = "/content/video"  # Replace with the path to your videos folder
output_folder = "/content/videos_frames"  # Output folder for images
frame_rate = 5  # Extract one frame per second

# Process all videos in the folder
process_videos_in_folder(videos_folder, output_folder, frame_rate)

Total images extracted: 44


In [4]:
# Download model from TF Hub and check out inference code from GitHub
!wget -q -O movenet_thunder.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite
!git clone https://github.com/tensorflow/examples.git
pose_sample_rpi_path = os.path.join(os.getcwd(), 'examples/lite/examples/pose_estimation/raspberry_pi')
sys.path.append(pose_sample_rpi_path)

# Load MoveNet Thunder model
import utils
from data import BodyPart
from ml import Movenet
movenet = Movenet('movenet_thunder')

# Define function to run pose estimation using MoveNet Thunder.
# You'll apply MoveNet's cropping algorithm and run inference multiple times on
# the input image to improve pose estimation accuracy.
def detect(input_tensor, inference_count=3):
  """Runs detection on an input image.

  Args:
    input_tensor: A [height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.
    inference_count: Number of times the model should run repeatly on the
      same input image to improve detection accuracy.

  Returns:
    A Person entity detected by the MoveNet.SinglePose.
  """
  image_height, image_width, channel = input_tensor.shape

  # Detect pose using the full input image
  movenet.detect(input_tensor.numpy(), reset_crop_region=True)

  # Repeatedly using previous detection result to identify the region of
  # interest and only croping that region to improve detection accuracy
  for _ in range(inference_count - 1):
    person = movenet.detect(input_tensor.numpy(),
                            reset_crop_region=False)

  return person

Cloning into 'examples'...
remote: Enumerating objects: 23745, done.[K
remote: Total 23745 (delta 0), reused 0 (delta 0), pack-reused 23745[K
Receiving objects: 100% (23745/23745), 44.08 MiB | 20.89 MiB/s, done.
Resolving deltas: 100% (12942/12942), done.


In [5]:
def draw_prediction_on_image(
    image, person, crop_region=None, close_figure=True,
    keep_input_size=False):
  """Draws the keypoint predictions on image.

  Args:
    image: An numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    person: A person entity returned from the MoveNet.SinglePose model.
    close_figure: Whether to close the plt figure after the function returns.
    keep_input_size: Whether to keep the size of the input image.

  Returns:
    An numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  # Draw the detection result on top of the image.
  image_np = utils.visualize(image, [person])

  # Plot the image with detection results.
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  im = ax.imshow(image_np)

  if close_figure:
    plt.close(fig)

  if not keep_input_size:
    image_np = utils.keep_aspect_ratio_resizer(image_np, (512, 512))

  return image_np

In [6]:
class MoveNetPreprocessor(object):
  """Helper class to preprocess pose sample images for classification."""

  def __init__(self,
               images_in_folder,
               images_out_folder,
               csvs_out_path):
    """Creates a preprocessor to detection pose from images and save as CSV.

    Args:
      images_in_folder: Path to the folder with the input images. It should
        follow this structure:
        yoga_poses
        |__ downdog
            |______ 00000128.jpg
            |______ 00000181.bmp
            |______ ...
        |__ goddess
            |______ 00000243.jpg
            |______ 00000306.jpg
            |______ ...
        ...
      images_out_folder: Path to write the images overlay with detected
        landmarks. These images are useful when you need to debug accuracy
        issues.
      csvs_out_path: Path to write the CSV containing the detected landmark
        coordinates and label of each image that can be used to train a pose
        classification model.
    """
    self._images_in_folder = images_in_folder
    self._images_out_folder = images_out_folder
    self._csvs_out_path = csvs_out_path
    self._messages = []

    # Create a temp dir to store the pose CSVs per class
    self._csvs_out_folder_per_class = tempfile.mkdtemp()

    # Get list of pose classes and print image statistics
    self._pose_class_names = sorted(
        [n for n in os.listdir(self._images_in_folder) if not n.startswith('.')]
        )

  def process(self, per_pose_class_limit=None, detection_threshold=0.1):
    """Preprocesses images in the given folder.
    Args:
      per_pose_class_limit: Number of images to load. As preprocessing usually
        takes time, this parameter can be specified to make the reduce of the
        dataset for testing.
      detection_threshold: Only keep images with all landmark confidence score
        above this threshold.
    """
    # Loop through the classes and preprocess its images
    for pose_class_name in self._pose_class_names:
      print('Preprocessing', pose_class_name, file=sys.stderr)

      # Paths for the pose class.
      images_in_folder = os.path.join(self._images_in_folder, pose_class_name)
      images_out_folder = os.path.join(self._images_out_folder, pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                  pose_class_name + '.csv')
      if not os.path.exists(images_out_folder):
        os.makedirs(images_out_folder)

      # Detect landmarks in each image and write it to a CSV file
      with open(csv_out_path, 'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file,
                                    delimiter=',',
                                    quoting=csv.QUOTE_MINIMAL)
        # Get list of images
        image_names = sorted(
            [n for n in os.listdir(images_in_folder) if not n.startswith('.')])
        if per_pose_class_limit is not None:
          image_names = image_names[:per_pose_class_limit]

        valid_image_count = 0

        # Detect pose landmarks from each image
        for image_name in tqdm.tqdm(image_names):
          image_path = os.path.join(images_in_folder, image_name)

          try:
            image = tf.io.read_file(image_path)
            image = tf.io.decode_jpeg(image)
          except:
            self._messages.append('Skipped ' + image_path + '. Invalid image.')
            continue
          else:
            image = tf.io.read_file(image_path)
            image = tf.io.decode_jpeg(image)
            image_height, image_width, channel = image.shape

          # Skip images that isn't RGB because Movenet requires RGB images
          if channel != 3:
            self._messages.append('Skipped ' + image_path +
                                  '. Image isn\'t in RGB format.')
            continue
          person = detect(image)

          # Save landmarks if all landmarks were detected
          min_landmark_score = min(
              [keypoint.score for keypoint in person.keypoints])
          should_keep_image = min_landmark_score >= detection_threshold
          if not should_keep_image:
            self._messages.append('Skipped ' + image_path +
                                  '. No pose was confidentlly detected.')
            continue

          valid_image_count += 1

          # Draw the prediction result on top of the image for debugging later
          output_overlay = draw_prediction_on_image(
              image.numpy().astype(np.uint8), person,
              close_figure=True, keep_input_size=True)

          # Write detection result into an image file
          output_frame = cv2.cvtColor(output_overlay, cv2.COLOR_RGB2BGR)
          cv2.imwrite(os.path.join(images_out_folder, image_name), output_frame)

          # Get landmarks and scale it to the same size as the input image
          pose_landmarks = np.array(
              [[keypoint.coordinate.x, keypoint.coordinate.y, keypoint.score]
                for keypoint in person.keypoints],
              dtype=np.float32)

          # Write the landmark coordinates to its per-class CSV file
          coordinates = pose_landmarks.flatten().astype(str).tolist()
          csv_out_writer.writerow([image_name] + coordinates)

        if not valid_image_count:
          raise RuntimeError(
              'No valid images found for the "{}" class.'
              .format(pose_class_name))

    # Print the error message collected during preprocessing.
    print('\n'.join(self._messages))

    # Combine all per-class CSVs into a single output file
    all_landmarks_df = self._all_landmarks_as_dataframe()
    all_landmarks_df.to_csv(self._csvs_out_path, index=False)

  def class_names(self):
    """List of classes found in the training dataset."""
    return self._pose_class_names

  def _all_landmarks_as_dataframe(self):
    """Merge all per-class CSVs into a single dataframe."""
    total_df = None
    for class_index, class_name in enumerate(self._pose_class_names):
      csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                  class_name + '.csv')
      per_class_df = pd.read_csv(csv_out_path, header=None)

      # Add the labels
      per_class_df['class_no'] = [class_index]*len(per_class_df)
      per_class_df['class_name'] = [class_name]*len(per_class_df)

      # Append the folder name to the filename column (first column)
      per_class_df[per_class_df.columns[0]] = (os.path.join(class_name, '')
        + per_class_df[per_class_df.columns[0]].astype(str))

      if total_df is None:
        # For the first class, assign its data to the total dataframe
        total_df = per_class_df
      else:
        # Concatenate each class's data into the total dataframe
        total_df = pd.concat([total_df, per_class_df], axis=0)

    list_name = [[bodypart.name + '_x', bodypart.name + '_y',
                  bodypart.name + '_score'] for bodypart in BodyPart]
    header_name = []
    for columns_name in list_name:
      header_name += columns_name
    header_name = ['file_name'] + header_name
    header_map = {total_df.columns[i]: header_name[i]
                  for i in range(len(header_name))}

    total_df.rename(header_map, axis=1, inplace=True)

    return total_df

In [48]:
import shutil
shutil.rmtree('/content/poses_images_Run')

In [7]:
  images_in_test_folder = ("/content/Run")
  images_out_test_folder = 'poses_images_Run'
  csvs_out_test_path = 'run_data.csv'

  preprocessor = MoveNetPreprocessor(
      images_in_folder=images_in_test_folder,
      images_out_folder=images_out_test_folder,
      csvs_out_path=csvs_out_test_path,
  )

  preprocessor.process(per_pose_class_limit=None)

Preprocessing videos_frames
100%|██████████| 44/44 [02:34<00:00,  3.51s/it]







In [8]:
def load_pose_landmarks(csv_path):
  """Loads a CSV created by MoveNetPreprocessor.

  Returns:
    X: Detected landmark coordinates and scores of shape (N, 17 * 3)
    y: Ground truth labels of shape (N, label_count)
    classes: The list of all class names found in the dataset
    dataframe: The CSV loaded as a Pandas dataframe features (X) and ground
      truth labels (y) to use later to train a pose classification model.
  """

  # Load the CSV file
  dataframe = pd.read_csv(csv_path)
  df_to_process = dataframe.copy()

  # Drop the file_name columns as you don't need it during training.
  df_to_process.drop(columns=['file_name'], inplace=True)

  # Extract the list of class names
  classes = df_to_process.pop('class_name').unique()

  # Extract the labels
  y = df_to_process.pop('class_no')

  # Convert the input features and labels into the correct format for training.
  X = df_to_process.astype('float64')
  y = keras.utils.to_categorical(y)

  return X, y, classes, dataframe

In [9]:
X, y, class_names, _ = load_pose_landmarks('/content/run_data.csv')

In [10]:
print(X.shape)
print(y.shape)

(44, 51)
(44, 1)


In [13]:
import pandas as pd

# Load the CSV file
#data = pd.read_csv('/content/run_data.csv')
data = X.astype('float32')

import tensorflow as tf

# Load the TFLite model
interpreter = tf.lite.Interpreter(model_path="/content/pose_classifier3.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
for index, row in data.iterrows():
    # Preprocess your row if necessary
    # row = preprocess(row)
    row = np.array(row, dtype=np.float32)
    # Set the tensor to point to the input data to be inferred
    interpreter.set_tensor(input_details[0]['index'], row)

    # Run the inference
    interpreter.invoke()

    # The function `get_tensor()` returns the output tensor
    output_data = interpreter.get_tensor(output_details[0]['index'])

    # Print or store the prediction from the model
    print(output_data)

ValueError: Cannot set tensor: Dimension mismatch. Got 1 but expected 2 for input 0.

In [14]:
interpreter = tf.lite.Interpreter(model_path="/content/pose_classifier3.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

import numpy as np

for index, row in data.iterrows():
    # Convert the row to a numpy array and ensure it's float32
    row = np.array(row, dtype=np.float32).reshape(1, -1)
    # Convert row to float32
    row = row.astype(np.float32)

    # Set the tensor to point to the input data to be inferred
    interpreter.set_tensor(input_details[0]['index'], row)

    # Run the inference
    interpreter.invoke()

    # The function `get_tensor()` returns the output tensor
    output_data = interpreter.get_tensor(output_details[0]['index'])

    # Print or store the prediction from the model
    print(output_data)

[[2.7022634e-10 1.0000000e+00]]
[[2.679629e-10 1.000000e+00]]
[[1.9897291e-10 1.0000000e+00]]
[[1.520332e-10 1.000000e+00]]
[[1.1965405e-10 1.0000000e+00]]
[[1.3188482e-10 1.0000000e+00]]
[[1.6391713e-10 1.0000000e+00]]
[[1.5340283e-10 1.0000000e+00]]
[[1.1415204e-10 1.0000000e+00]]
[[9.6783914e-11 1.0000000e+00]]
[[1.1244425e-10 1.0000000e+00]]
[[1.4184141e-10 1.0000000e+00]]
[[1.8449922e-10 1.0000000e+00]]
[[2.0618053e-10 1.0000000e+00]]
[[2.2897971e-10 1.0000000e+00]]
[[2.3417296e-10 1.0000000e+00]]
[[2.4241975e-10 1.0000000e+00]]
[[1.8578673e-10 1.0000000e+00]]
[[1.4551228e-10 1.0000000e+00]]
[[1.180635e-10 1.000000e+00]]
[[1.2849646e-10 1.0000000e+00]]
[[1.7372874e-10 1.0000000e+00]]
[[1.5321218e-10 1.0000000e+00]]
[[1.1873762e-10 1.0000000e+00]]
[[9.935291e-11 1.000000e+00]]
[[1.1296813e-10 1.0000000e+00]]
[[1.3940313e-10 1.0000000e+00]]
[[1.78416e-10 1.00000e+00]]
[[1.9618986e-10 1.0000000e+00]]
[[2.3880664e-10 1.0000000e+00]]
[[2.3702326e-10 1.0000000e+00]]
[[2.60201e-10 1.0000

In [17]:
import tensorflow as tf
import numpy as np

# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path="/content/pose_classifier3.tflite")
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Initialize variables for calculating average predictions
correct_sum = 0.0
not_correct_sum = 0.0
total_rows = 0

for index, row in data.iterrows():
    # Convert the row to a numpy array and ensure it's float32
    row = np.array(row, dtype=np.float32).reshape(1, -1)

    # Set the tensor to point to the input data to be inferred
    interpreter.set_tensor(input_details[0]['index'], row)

    # Run the inference
    interpreter.invoke()

    # The function `get_tensor()` returns the output tensor
    output_data = interpreter.get_tensor(output_details[0]['index'])

    # Accumulate predictions for each class
    correct_sum += output_data[0][1]  # Prediction for "correct" class
    not_correct_sum += output_data[0][0]  # Prediction for "not correct" class
    total_rows += 1

# Calculate average predictions for each class
average_prediction_correct = correct_sum / total_rows
average_prediction_not_correct = not_correct_sum / total_rows

# Determine which class has the higher average prediction ratio
if average_prediction_correct > average_prediction_not_correct:
    average_prediction_class = "Correct"
    average_prediction_ratio = average_prediction_correct
else:
    average_prediction_class = "Not Correct"
    average_prediction_ratio = average_prediction_not_correct

print("Average Prediction:", average_prediction_class)



Average Prediction: Correct


In [18]:
cap = cv2.VideoCapture('/content/video/pull up_11.mp4')
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2
font_color = (0, 255, 0)
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    cv2.putText(frame, f'Prediction: {average_prediction_class}', (50, 50),
                    font, font_scale, font_color, font_thickness)

    # Display the frame
    cv2.imshow('Frame', frame)

    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

DisabledFunctionError: cv2.imshow() is disabled in Colab, because it causes Jupyter sessions
to crash; see https://github.com/jupyter/notebook/issues/3935.
As a substitution, consider using
  from google.colab.patches import cv2_imshow
