In [None]:
!pip install tensorflow tensorflow-hub opencv-python mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.20 sounddevice-0.5.1


In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
from google.colab import files
import os

class PoseEstimator:
    def __init__(self):
        print("Initializing MoveNet model...")
        model = hub.load('https://tfhub.dev/google/movenet/singlepose/lightning/4')
        self.model = model.signatures['serving_default']

        self.keypoints_names = [
            'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
            'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
            'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
            'left_knee', 'right_knee', 'left_ankle', 'right_ankle'
        ]

        self.connections = [
            ('left_shoulder', 'right_shoulder'),
            ('left_shoulder', 'left_elbow'),
            ('right_shoulder', 'right_elbow'),
            ('left_elbow', 'left_wrist'),
            ('right_elbow', 'right_wrist'),
            ('left_shoulder', 'left_hip'),
            ('right_shoulder', 'right_hip'),
            ('left_hip', 'right_hip'),
            ('left_hip', 'left_knee'),
            ('right_hip', 'right_knee'),
            ('left_knee', 'left_ankle'),
            ('right_knee', 'right_ankle')
        ]

    def process_frame(self, frame):
        # Correct preprocessing for MoveNet
        input_frame = tf.convert_to_tensor(frame)
        input_frame = tf.expand_dims(input_frame, axis=0)
        input_frame = tf.image.resize_with_pad(input_frame, 192, 192)
        input_frame = tf.cast(input_frame, dtype=tf.int32)  # Ensure int32 type

        # Run inference with named input
        results = self.model(input=input_frame)  # Note the named argument
        keypoints = results['output_0'].numpy()[0, 0]  # Correct indexing

        return self._format_keypoints(keypoints)

    def process_video(self, input_path, output_path):
        print(f"Opening video file: {input_path}")
        cap = cv2.VideoCapture(input_path)

        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"Video properties: {frame_width}x{frame_height} at {fps} fps")
        print(f"Total frames to process: {total_frames}")

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            if frame_count % 10 == 0:
                progress = (frame_count / total_frames) * 100
                print(f'Processing: {progress:.1f}% complete ({frame_count}/{total_frames} frames)', end='\r')

            # Process frame
            keypoints = self.process_frame(frame)
            annotated_frame = self._draw_predictions(frame, keypoints)

            out.write(annotated_frame)

        cap.release()
        out.release()
        print("\nVideo processing complete!")
        print(f"Saved output video to: {output_path}")

    def _format_keypoints(self, keypoints):
        formatted_keypoints = {}
        for idx, name in enumerate(self.keypoints_names):
            y, x, confidence = keypoints[idx]
            formatted_keypoints[name] = {
                'x': float(x),
                'y': float(y),
                'confidence': float(confidence)
            }
        return formatted_keypoints

    def _draw_predictions(self, frame, keypoints):
        # Draw connections
        for connection in self.connections:
            point1 = keypoints[connection[0]]
            point2 = keypoints[connection[1]]

            if point1['confidence'] > 0.3 and point2['confidence'] > 0.3:
                x1 = int(point1['x'] * frame.shape[1])
                y1 = int(point1['y'] * frame.shape[0])
                x2 = int(point2['x'] * frame.shape[1])
                y2 = int(point2['y'] * frame.shape[0])
                cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)

        # Draw keypoints
        for name, point in keypoints.items():
            if point['confidence'] > 0.8:
                x = int(point['x'] * frame.shape[1])
                y = int(point['y'] * frame.shape[0])
                cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)

        return frame

# Process the video
uploaded = files.upload()
input_video_path = next(iter(uploaded.keys()))
output_video_path = 'MoveNet_Output_handshake1_video.mp4'

estimator = PoseEstimator()
estimator.process_video(input_video_path, output_video_path)

# Download the processed video
#files.download(output_video_path)


Saving 150.mp4 to 150 (2).mp4
Initializing MoveNet model...
Opening video file: 150 (2).mp4
Video properties: 900x900 at 50 fps
Total frames to process: 358

Video processing complete!
Saved output video to: MoveNet_Output_handshake1_video.mp4


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
from google.colab import files
import os

class PoseEstimator:
    def __init__(self):
        print("Initializing MoveNet model...")
        model = hub.load('https://tfhub.dev/google/movenet/singlepose/lightning/4')
        self.model = model.signatures['serving_default']

        self.keypoints_names = [
            'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
            'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
            'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
            'left_knee', 'right_knee', 'left_ankle', 'right_ankle'
        ]

        self.connections = [
            ('left_shoulder', 'right_shoulder'),
            ('left_shoulder', 'left_elbow'),
            ('right_shoulder', 'right_elbow'),
            ('left_elbow', 'left_wrist'),
            ('right_elbow', 'right_wrist'),
            ('left_shoulder', 'left_hip'),
            ('right_shoulder', 'right_hip'),
            ('left_hip', 'right_hip'),
            ('left_hip', 'left_knee'),
            ('right_hip', 'right_knee'),
            ('left_knee', 'left_ankle'),
            ('right_knee', 'right_ankle')
        ]

    def process_frame(self, frame):
        # Correct preprocessing for MoveNet
        input_frame = tf.convert_to_tensor(frame)
        input_frame = tf.expand_dims(input_frame, axis=0)
        input_frame = tf.image.resize_with_pad(input_frame, 192, 192)
        input_frame = tf.cast(input_frame, dtype=tf.int32)  # Ensure int32 type

        # Run inference with named input
        results = self.model(input=input_frame)  # Note the named argument
        keypoints = results['output_0'].numpy()[0, 0]  # Correct indexing

        return self._format_keypoints(keypoints)

    def process_video(self, input_path, output_path):
        print(f"Opening video file: {input_path}")
        cap = cv2.VideoCapture(input_path)

        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"Video properties: {frame_width}x{frame_height} at {fps} fps")
        print(f"Total frames to process: {total_frames}")

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            if frame_count % 10 == 0:
                progress = (frame_count / total_frames) * 100
                print(f'Processing: {progress:.1f}% complete ({frame_count}/{total_frames} frames)', end='\r')

            # Process frame
            keypoints = self.process_frame(frame)
            annotated_frame = self._draw_predictions(frame, keypoints)

            out.write(annotated_frame)

        cap.release()
        out.release()
        print("\nVideo processing complete!")
        print(f"Saved output video to: {output_path}")

    def _format_keypoints(self, keypoints):
        formatted_keypoints = {}
        for idx, name in enumerate(self.keypoints_names):
            y, x, confidence = keypoints[idx]
            formatted_keypoints[name] = {
                'x': float(x),
                'y': float(y),
                'confidence': float(confidence)
            }
        return formatted_keypoints

    def _draw_predictions(self, frame, keypoints):
        # Variables to store bounding box limits
        min_x, min_y = float('inf'), float('inf')
        max_x, max_y = 0, 0

        # Draw connections
        for connection in self.connections:
            point1 = keypoints[connection[0]]
            point2 = keypoints[connection[1]]

            if point1['confidence'] > 0.3 and point2['confidence'] > 0.3:
                x1 = int(point1['x'] * frame.shape[1])
                y1 = int(point1['y'] * frame.shape[0])
                x2 = int(point2['x'] * frame.shape[1])
                y2 = int(point2['y'] * frame.shape[0])
                cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)

        # Draw keypoints and calculate bounding box
        for name, point in keypoints.items():
            if point['confidence'] > 0.3:
                x = int(point['x'] * frame.shape[1])
                y = int(point['y'] * frame.shape[0])
                cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)

                # Update bounding box limits
                min_x = min(min_x, x)
                min_y = min(min_y, y)
                max_x = max(max_x, x)
                max_y = max(max_y, y)

        # Draw the bounding box if valid keypoints were detected
        if min_x < max_x and min_y < max_y:
            cv2.rectangle(frame, (min_x, min_y), (max_x, max_y), (255, 0, 0), 2)

        return frame

# Process the video
uploaded = files.upload()
input_video_path = next(iter(uploaded.keys()))
output_video_path = 'MoveNet_HS1_Output_with_Bounding_Box.mp4'

estimator = PoseEstimator()
estimator.process_video(input_video_path, output_video_path)

# Download the processed video
# files.download(output_video_path)


TypeError: 'NoneType' object is not subscriptable

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
import csv
from google.colab import files
import os

class PoseEstimator:
    def __init__(self):
        print("Initializing MoveNet model...")
        model = hub.load('https://tfhub.dev/google/movenet/singlepose/lightning/4')
        self.model = model.signatures['serving_default']

        self.keypoints_names = [
            'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
            'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
            'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
            'left_knee', 'right_knee', 'left_ankle', 'right_ankle'
        ]

        self.connections = [
            ('left_shoulder', 'right_shoulder'),
            ('left_shoulder', 'left_elbow'),
            ('right_shoulder', 'right_elbow'),
            ('left_elbow', 'left_wrist'),
            ('right_elbow', 'right_wrist'),
            ('left_shoulder', 'left_hip'),
            ('right_shoulder', 'right_hip'),
            ('left_hip', 'right_hip'),
            ('left_hip', 'left_knee'),
            ('right_hip', 'right_knee'),
            ('left_knee', 'left_ankle'),
            ('right_knee', 'right_ankle')
        ]

        self.data = []  # To store keypoint data for CSV

    def process_frame(self, frame, frame_index, fps):
        # Correct preprocessing for MoveNet
        input_frame = tf.convert_to_tensor(frame)
        input_frame = tf.expand_dims(input_frame, axis=0)
        input_frame = tf.image.resize_with_pad(input_frame, 192, 192)
        input_frame = tf.cast(input_frame, dtype=tf.int32)  # Ensure int32 type

        # Run inference with named input
        results = self.model(input=input_frame)  # Note the named argument
        keypoints = results['output_0'].numpy()[0, 0]  # Correct indexing

        # Log data to the CSV
        self._log_keypoints_data(frame, keypoints, frame_index, fps)

        return self._format_keypoints(keypoints)

    def process_video(self, input_path, output_path, csv_path):
        print(f"Opening video file: {input_path}")
        cap = cv2.VideoCapture(input_path)

        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"Video properties: {frame_width}x{frame_height} at {fps} fps")
        print(f"Total frames to process: {total_frames}")

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            if frame_count % 10 == 0:
                progress = (frame_count / total_frames) * 100
                print(f'Processing: {progress:.1f}% complete ({frame_count}/{total_frames} frames)', end='\r')

            # Process frame
            keypoints = self.process_frame(frame, frame_count, fps)
            annotated_frame = self._draw_predictions(frame, keypoints)

            out.write(annotated_frame)

        cap.release()
        out.release()
        self._save_to_csv(csv_path)
        print("\nVideo processing complete!")
        print(f"Saved output video to: {output_path}")
        print(f"Saved keypoint data to: {csv_path}")

    def _format_keypoints(self, keypoints):
        formatted_keypoints = {}
        for idx, name in enumerate(self.keypoints_names):
            y, x, confidence = keypoints[idx]
            formatted_keypoints[name] = {
                'x': float(x),
                'y': float(y),
                'confidence': float(confidence)
            }
        return formatted_keypoints

    def _log_keypoints_data(self, frame, keypoints, frame_index, fps):
        # Calculate bounding box
        min_x, min_y = float('inf'), float('inf')
        max_x, max_y = 0, 0
        for idx, kp in enumerate(keypoints):
            y, x, confidence = kp
            if confidence > 0.3:  # Only consider visible keypoints
                min_x = min(min_x, x)
                min_y = min(min_y, y)
                max_x = max(max_x, x)
                max_y = max(max_y, y)

        bounding_box = (min_x, min_y, max_x, max_y)

        for idx, name in enumerate(self.keypoints_names):
            y, x, confidence = keypoints[idx]
            self.data.append({
                'frame_index': frame_index,
                'vid_time (s)': frame_index / fps,
                'person_id': 1,  # Assuming a single person for MoveNet
                'bounding_box': bounding_box,
                'keypoint_name': name,
                'keypoint_x': float(x),
                'keypoint_y': float(y),
                'keypoint_confidence': float(confidence)
            })

    def _save_to_csv(self, csv_path):
        fieldnames = [
            'frame_index', 'vid_time (s)', 'person_id',
            'bounding_box', 'keypoint_name', 'keypoint_x', 'keypoint_y',
            'keypoint_confidence'
        ]
        with open(csv_path, mode='w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(self.data)

    def _draw_predictions(self, frame, keypoints):
        # Variables to store bounding box limits
        min_x, min_y = float('inf'), float('inf')
        max_x, max_y = 0, 0

        # Draw connections
        for connection in self.connections:
            point1 = keypoints[connection[0]]
            point2 = keypoints[connection[1]]

            if point1['confidence'] > 0.3 and point2['confidence'] > 0.3:
                x1 = int(point1['x'] * frame.shape[1])
                y1 = int(point1['y'] * frame.shape[0])
                x2 = int(point2['x'] * frame.shape[1])
                y2 = int(point2['y'] * frame.shape[0])
                cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)

        # Draw keypoints and calculate bounding box
        for name, point in keypoints.items():
            if point['confidence'] > 0.3:
                x = int(point['x'] * frame.shape[1])
                y = int(point['y'] * frame.shape[0])
                cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)

                # Update bounding box limits
                min_x = min(min_x, x)
                min_y = min(min_y, y)
                max_x = max(max_x, x)
                max_y = max(max_y, y)

        # Draw the bounding box if valid keypoints were detected
        if min_x < max_x and min_y < max_y:
            cv2.rectangle(frame, (min_x, min_y), (max_x, max_y), (255, 0, 0), 2)

        return frame

# Process the video and generate CSV
uploaded = files.upload()
input_video_path = next(iter(uploaded.keys()))
output_video_path = 'MoveNet_Output_with_Bounding_Box.mp4'
csv_output_path = 'MoveNet_Keypoints_Data.csv'

estimator = PoseEstimator()
estimator.process_video(input_video_path, output_video_path, csv_output_path)

# Download the processed CSV
files.download(csv_output_path)
