In [3]:
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm
import math

from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras

#from sklearn.model_selection import train_test_split
#from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [6]:
#@title Functions to run pose estimation with MoveNet

#@markdown You'll download the MoveNet Thunder model from [TensorFlow Hub](https://www.google.com/url?sa=D&q=https%3A%2F%2Ftfhub.dev%2Fs%3Fq%3Dmovenet), and reuse some inference and visualization logic from the [MoveNet Raspberry Pi (Python)](https://github.com/tensorflow/examples/tree/master/lite/examples/pose_estimation/raspberry_pi) sample app to detect landmarks (ear, nose, wrist etc.) from the input images.

#@markdown *Note: You should use the most accurate pose estimation model (i.e. MoveNet Thunder) to detect the keypoints and use them to train the pose classification model to achieve the best accuracy. When running inference, you can use a pose estimation model of your choice (e.g. either MoveNet Lightning or Thunder).*

# Download model from TF Hub and check out inference code from GitHub
!wget -q -O movenet_thunder.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite
!git clone https://github.com/tensorflow/examples.git
pose_sample_rpi_path = os.path.join(os.getcwd(), 'examples/lite/examples/pose_estimation/raspberry_pi')
sys.path.append(pose_sample_rpi_path)

# Load MoveNet Thunder model
import utils
from data import BodyPart
from ml import Movenet
movenet = Movenet('movenet_thunder')

# Define function to run pose estimation using MoveNet Thunder.
# You'll apply MoveNet's cropping algorithm and run inference multiple times on
# the input image to improve pose estimation accuracy.
def detect(input_tensor, inference_count=3):
  """Runs detection on an input image.

  Args:
    input_tensor: A [height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.
    inference_count: Number of times the model should run repeatly on the
      same input image to improve detection accuracy.

  Returns:
    A Person entity detected by the MoveNet.SinglePose.
  """
  image_height, image_width, channel = input_tensor.shape

  # Detect pose using the full input image
  movenet.detect(input_tensor.numpy(), reset_crop_region=True)

  # Repeatedly using previous detection result to identify the region of
  # interest and only croping that region to improve detection accuracy
  for _ in range(inference_count - 1):
    person = movenet.detect(input_tensor.numpy(),
                            reset_crop_region=False)

  return person

Cloning into 'examples'...
remote: Enumerating objects: 23600, done.[K
remote: Counting objects: 100% (560/560), done.[K
remote: Compressing objects: 100% (336/336), done.[K
remote: Total 23600 (delta 164), reused 509 (delta 144), pack-reused 23040[K
Receiving objects: 100% (23600/23600), 44.09 MiB | 30.38 MiB/s, done.
Resolving deltas: 100% (12814/12814), done.


In [None]:
def lineA(person_pose):
  nose= person_pose.keypoints[0].coordinate
  left_shoulder = person_pose.keypoints[5].coordinate
  return nose_leftshoulder_line = np.array(nose+left_shoulder)
def lineB(person_pose):
  left_shoulder = person_pose.keypoints[5].coordinate
  left_hip = person_pose.keypoints[11].coordinate
  return leftshoulder_lefthip_line= np.array(left_shoulder+left_hip)

In [7]:
#@title Functions to visualize the pose estimation results.

def draw_prediction_on_image(
    image, person, crop_region=None, close_figure=True,
    keep_input_size=False):
  """Draws the keypoint predictions on image.

  Args:
    image: An numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    person: A person entity returned from the MoveNet.SinglePose model.
    close_figure: Whether to close the plt figure after the function returns.
    keep_input_size: Whether to keep the size of the input image.

  Returns:
    An numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  # Draw the detection result on top of the image.
  image_np = utils.visualize(image, [person])

  # Plot the image with detection results.
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  im = ax.imshow(image_np)

  if close_figure:
    plt.close(fig)

  if not keep_input_size:
    image_np = utils.keep_aspect_ratio_resizer(image_np, (512, 512))

  return image_np

Function to determine if pose is bad or good

In [10]:


def dot(vA, vB):
    return vA[0]*vB[0]+vA[1]*vB[1]

def angle(person_pose):
    # Get nicer vector form
    #vA = [(lineA[0][0]-lineA[1][0]), (lineA[0][1]-lineA[1][1])]
    #vB = [(lineB[0][0]-lineB[1][0]), (lineB[0][1]-lineB[1][1])]

    nose= person_pose.keypoints[0].coordinate
    left_shoulder = person_pose.keypoints[5].coordinate
    nose_leftshoulder_line = np.array(nose+left_shoulder)

    left_shoulder = person_pose.keypoints[5].coordinate
    left_hip = person_pose.keypoints[11].coordinate
    leftshoulder_lefthip_line= np.array(left_shoulder+left_hip)

    lineA = nose_leftshoulder_line
    lineB = leftshoulder_lefthip_line
    vA = [(lineA[0]-lineA[2]), (lineA[1]-lineA[3])]
    vB = [(lineB[0]-lineB[2]), (lineB[1]-lineB[3])]

    # Get dot prod
    dot_prod = dot(vA, vB)
    # Get magnitudes
    magA = dot(vA, vA)**0.5
    magB = dot(vB, vB)**0.5
    # Get cosine value
    cos_ = dot_prod/magA/magB
    # Get angle in radians and then convert to degrees
    angle = math.acos(dot_prod/magB/magA)
    angle = angle*180/np.pi
    return angle



In [30]:
!wget https://github.com/kitana2505/BadPostureAlert/blob/b8fb31ca000a59dbc8b4ae8fdf22a2c9c5cbc878/assets/IMG_0257.mp4

--2023-09-04 14:05:01--  https://github.com/kitana2505/BadPostureAlert/blob/b8fb31ca000a59dbc8b4ae8fdf22a2c9c5cbc878/assets/IMG_0257.mp4
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4609 (4.5K) [text/plain]
Saving to: ‘IMG_0257.mp4’


2023-09-04 14:05:01 (60.3 MB/s) - ‘IMG_0257.mp4’ saved [4609/4609]



In [27]:
cap = cv2.VideoCapture("./IMG_0257.mp4")
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = int(cap.get(5))
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

In [28]:
# Tùy chỉnh output video
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
output_video = cv2.VideoWriter('output_video.mp4', fourcc, fps, (frame_width, frame_height))

In [29]:
counter = 0
while True:
  ret, frame = cap.read()

  if not ret:
    break # break loop khi chạy hết video
  drawed_frame = frame.copy()
  '''
  # Process frame, thay code của em vào đây
  # Ở đây a vẽ một hình tròn bán kính ngẫu nhiên vào video

  center = (frame_height // 2, frame_width // 2)
  radius = np.random.randint(50, 150)
  color = (0, 0, 255)
  thickness = -1
  cv2.circle(drawed_frame, center, radius, color, thickness)'''

  #image = tf.io.decode_png(image)
  #image = image[:,:, :3]
  person_pose = detect(drawed_frame)
  _ = draw_prediction_on_image(drawed_frame.numpy(), person_pose, crop_region=None,
                               close_figure=False, keep_input_size=True)
  critical_angle = angle(person_pose)

  # describe the type of font to be used.
  font = cv2.FONT_HERSHEY_SIMPLEX

    # Use putText() method for
    # inserting text on video
  if critical_angle > 45:

    cv2.putText(drawed_frame,
                'angle = '+ str(critical_angle) + '. Bad pose detected',
                (50, 50),
                font, 1,
                (0, 255, 255),
                2,
                cv2.LINE_4)


  # Save cái video đã xử lý vào output_video.mp4
  output_video.write(drawed_frame)

  counter+=1
  print(f"[INFO] Process {counter}/{num_frames} frame...")

# Release video objects
cap.release()
output_video.release()


