In [None]:
import cv2
import mediapipe as mp
import os
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2


def draw_landmarks_on_image(image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    if not pose_landmarks_list:
        return image

    annotated_image = image.copy()
    mp_drawing = mp.solutions.drawing_utils
    mp_pose = mp.solutions.pose

    for landmarks in pose_landmarks_list:
        # Convert each TasksAPI landmark to protobuf-compatible format
        converted_landmarks = [
            landmark_pb2.NormalizedLandmark(
                x=lmk.x,
                y=lmk.y,
                z=lmk.z,
                visibility=lmk.visibility
            ) for lmk in landmarks
        ]

        landmark_proto = landmark_pb2.NormalizedLandmarkList(landmark=converted_landmarks)

        mp_drawing.draw_landmarks(
            image=annotated_image,
            landmark_list=landmark_proto,
            connections=mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
            connection_drawing_spec=mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2),
        )
    return annotated_image


# Set up MediaPipe PoseLandmarker
base_options = python.BaseOptions(model_asset_path=r'C:\repos\OpenCamera-Sensors\cv_models\mediapipe\pose_landmarker_heavy.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    output_segmentation_masks=False)
detector = vision.PoseLandmarker.create_from_options(options)

# Directory with test videos
input_dir = r'C:\videos\Test Video Data\Reece06082025'
output_dir = r'C:\videos\Test Video Data\Reece06082025\output'
os.makedirs(output_dir, exist_ok=True)

# Process each video in the directory
for file_name in os.listdir(input_dir):
    if not file_name.endswith('.mp4'):
        continue

    video_path = os.path.join(input_dir, file_name)
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    out_path = os.path.join(output_dir, f"pose_{file_name}")
    out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    print(f"Processing: {file_name}")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert frame to RGB and wrap in MediaPipe Image
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

        # Detect pose
        detection_result = detector.detect(mp_image)

        # Draw landmarks
        annotated_frame = draw_landmarks_on_image(rgb_frame, detection_result)

        # Convert RGB to BGR for OpenCV
        bgr_annotated = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)

        # Write frame to output video
        out.write(bgr_annotated)

    cap.release()
    out.release()
    print(f"Saved to: {out_path}")


Processing: Reece_Test_01.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_01.mp4
Processing: Reece_Test_CartWheel_01.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_CartWheel_01.mp4
Processing: Reece_Test_CartWheel_02.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_CartWheel_02.mp4
Processing: Reece_Test_CartWheel_03.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_CartWheel_03.mp4
Processing: Reece_Test_ForwardRoll_02.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_ForwardRoll_02.mp4
Processing: Reece_Test_PikeJump_01.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_PikeJump_01.mp4
Processing: Reece_Test_PikeJump_02.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_PikeJump_02.mp4
Processing: Reece_Test_Roll.mp4
Saved to: C:\videos\Test Video Data\Reece06082025output\pose_Reece_Test_Roll.mp4


In [None]:
import time
import sys
import os
import threading
import tkinter as tk
from tkinter import messagebox, scrolledtext

sys.path.append(os.path.abspath('.'))
from src.RemoteControl import RemoteControl

HOST = '192.168.4.245'  # The smartphone's IP address

class RemoteControlGUI:
    def __init__(self, master):
        self.master = master
        master.title("Remote Video Control")

        self.remote = None
        self.recording = False

        self.status_area = scrolledtext.ScrolledText(master, height=10, width=50, state='disabled')
        self.status_area.pack(padx=10, pady=10)

        self.start_button = tk.Button(master, text="Start Recording", command=self.start_recording)
        self.start_button.pack(padx=10, pady=5)

        self.stop_button = tk.Button(master, text="Stop Recording", command=self.stop_recording, state=tk.DISABLED)
        self.stop_button.pack(padx=10, pady=5)

        self.quit_button = tk.Button(master, text="Quit", command=master.quit)
        self.quit_button.pack(pady=10)

    def log(self, message):
        self.status_area.config(state='normal')
        self.status_area.insert(tk.END, message + "\n")
        self.status_area.see(tk.END)
        self.status_area.config(state='disabled')
        print(message)

    def start_recording(self):
        def thread_func():
            try:
                self.remote = RemoteControl(HOST)
                self.log("Connected to device.")

                phase, duration, exp_time = self.remote.start_video()
                self.log(f"Started video: exposure {exp_time}, duration {duration:.2f}")
                self.recording = True

                self.start_button.config(state=tk.DISABLED)
                self.stop_button.config(state=tk.NORMAL)

            except Exception as e:
                self.log(f"Error starting recording: {e}")
                messagebox.showerror("Error", str(e))

        threading.Thread(target=thread_func).start()

    def stop_recording(self):
        def thread_func():
            try:
                if not self.remote:
                    self.log("Not connected.")
                    return

                self.remote.stop_video()
                self.log("Stopped video. Downloading...")

                start = time.time()
                filename = self.remote.get_video(want_progress_bar=True)
                end = time.time()
                self.log(f"Video saved to {filename}. Elapsed: {end - start:.2f} seconds")

                self.remote.close()
                self.log("Connection closed.")

                self.start_button.config(state=tk.NORMAL)
                self.stop_button.config(state=tk.DISABLED)
                self.recording = False

            except Exception as e:
                self.log(f"Error stopping recording: {e}")
                messagebox.showerror("Error", str(e))

        threading.Thread(target=thread_func).start()


# if __name__ == "__main__":
root = tk.Tk()
app = RemoteControlGUI(root)
root.mainloop()
