In [5]:
import pyrealsense2 as rs
import numpy as np
import cv2
import mediapipe as mp

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Initialize MediaPipe Pose
pose = mp_pose.Pose()

def put_text(image, text, y, color=(0, 0, 255)):
    cv2.putText(image, text, (10, 50 + y), cv2.FONT_HERSHEY_COMPLEX, 1, color, 2)
    return image

def draw_line(image, h):
    _, width, _ = image.shape
    color = (0, 255, 0)  # You can change this to set the line color (B, G, R)
    thickness = 2  # You can change this to set the line thickness
    start_point = (50, h)  # You can change this to set the starting point of the line
    end_point = (width - 50, h)  # You can change this to set the ending point of the line
    image = cv2.line(image, start_point, end_point, color, thickness)
    return image

file_path = r"D:\video2\20240107_151948.bag"
pipeline = rs.pipeline()
config = rs.config()
rs.config.enable_device_from_file(config, file_path, repeat_playback=False)
pipeline.start(config)
count = 0
window_name = "hihi"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 800, 600) 
while True:
    try:
        # Get frameset of depth
        frames = pipeline.wait_for_frames()
        count += 1
        if not (count % 2):
            continue

        # Get color frame
        color_frame = frames.get_color_frame()
        color_image = np.asanyarray(color_frame.get_data())
        frame = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
        h_im, w_im, _ = frame.shape
        results = pose.process(frame)
        print(h_im,w_im)
        if results.pose_landmarks:
            left_y = results.pose_landmarks.landmark[15].y * h_im
            right_y = results.pose_landmarks.landmark[16].y * h_im
            centre = results.pose_landmarks.landmark[24].y * h_im - \
                     (results.pose_landmarks.landmark[24].y * h_im - results.pose_landmarks.landmark[12].y * h_im) * 0.2
            mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
            is_action = False
            if left_y <=centre or right_y<=centre:
                is_action = True
            frame = put_text(frame, f"Left_y: {left_y:.2f}", 100)
            frame = put_text(frame, f"Right_y: {right_y:.2f}", 50)
            frame = put_text(frame, f"Center: {centre:.2f}", 150)
            frame = put_text(frame, f"isaction: {is_action}", 200)
            frame = draw_line(frame,int(centre))
            cv2.imshow("hihi", frame)
            if cv2.waitKey(1)==ord('q'):
                break# Use cv2.waitKey(1) instead of cv2.waitKey(0) for non-blocking display

    except Exception as e:
        print(e)
        break

# Release resources
pipeline.stop()
cv2.destroyAllWindows()


720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280
720 1280


In [3]:
from utils import *
from pathlib import Path
import os
import cv2
import numpy as np
import mediapipe as mp
import glob
import pyrealsense2 as rs
from tqdm import tqdm
class Action:
    def __init__(self, opt_path):
        self.opt = read_yaml(opt_path)
        self.mp_pose = mp.solutions.pose
        self.posedetector = self.mp_pose.Pose()
        self.count = 0
        self.is_action = None
        self.num_num_sub_video = 0
        self.w,self.h = self.opt['image'].values()

    def new_start(self, file_path):
        self.pipeline = rs.pipeline()
        config = rs.config()
        rs.config.enable_device_from_file(config, file_path, repeat_playback=False)
        self.pipeline.start(config)

        self.count = 0
        self.is_action = None
        self.num_sub_video = 0
        name_action = os.path.basename(os.path.dirname(file_path))
        self.folder_output = os.path.join(self.opt['out_folder'], name_action)
        os.makedirs(self.folder_output, exist_ok=True)
        self.input_id = os.path.basename(file_path).split(".")[0]

    def get_key_points(self, frame):
        h_im, _, _ = frame.shape
        results = self.posedetector.process(frame)
        left_y, right_y, centre = None,None,None
        if results.pose_landmarks:
            left_y = results.pose_landmarks.landmark[15].y * h_im
            right_y = results.pose_landmarks.landmark[16].y * h_im
            centre = results.pose_landmarks.landmark[24].y * h_im - \
                    (results.pose_landmarks.landmark[24].y * h_im - results.pose_landmarks.landmark[12].y * h_im) * self.opt['rate_up']
        return left_y, right_y, centre, results

    def start_capture(self):
        name = f"{self.input_id}_sub_{self.num_sub_video}.mp4"
        output_file = os.path.join(self.folder_output, name)
        self.num_sub_video += 1
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        self.out = cv2.VideoWriter(output_file, fourcc, self.opt['out_fps'], (self.w, self.h))
        print("start caputure: ",output_file)

    def check_action(self, left_y, right_y, centre):
        return left_y < centre or right_y < centre

    def update_action(self, is_action, frame):
        if self.is_action:
                self.out.write(frame)
        if is_action:
            self.count = max(1, self.count + 1)
            if self.count >= self.opt['thresh_frame_action']:
                if self.is_action is False or self.is_action is None:
                    self.start_capture()
                self.is_action = True
        else:
            self.count = min(-1, self.count - 1)
            if self.count <= -self.opt['thresh_frame_no_action']:
                if self.is_action:
                    self.out.release()
                    print("end action")
                self.is_action = False

    def add_new_frame(self, frame):
        left_y, right_y, centre, results = self.get_key_points(frame)
        frame = cv2.resize(frame,(self.w,self.h))
        if left_y is not None:
            is_action = self.check_action(left_y, right_y, centre)
            self.update_action(is_action, frame)
        else:
            self.update_action(self.is_action, frame)
            
    def segment_video(self):
        count = 0
        while True:
            # try:
                import pdb;pdb.set_trace()
                frames = self.pipeline.wait_for_frames()
                count += 1 
                color_frame = frames.get_color_frame()
                color_image = np.asanyarray(color_frame.get_data())
                rgb_image = cv2.cvtColor(color_image,cv2.COLOR_BGR2RGB)
                self.add_new_frame(rgb_image)
            # except Exception as e:
            #     print(e)
            #     break
        self.pipeline.release()
    def segment_video_folder(self):
        for sub_folder in tqdm(glob.glob(os.path.join(self.opt['input_folder'], "*")), desc='Processing Subfolders'):
            for path in tqdm(glob.glob(os.path.join(sub_folder, "*")), desc=f'Processing Files in {os.path.basename(sub_folder)}', leave=False):
                self.new_start(path)
                self.segment_video()
            
                
       
            


# Assuming you have a valid opt.yaml file and rs is the RealSense module
# You should also have a loop to capture frames and call add_new_frame method
# Example:
action = Action(r"D:\thu_anh\config.yaml")
action.segment_video()

> [1;32mc:\users\phuoc\appdata\local\temp\ipykernel_18332\1453932197.py[0m(87)[0;36msegment_video[1;34m()[0m



In [None]:

action.start_capture()

1000


In [1]:
cv2.destroyAllWindows()

NameError: name 'cv2' is not defined