executor

In [None]:
#!/usr/bin/env python3
import rospy
import moveit_commander
import sys

class UR5Controller:
    def __init__(self):
        # 初始化ROS节点
        rospy.init_node('ur5_controller', anonymous=True)

        # 初始化MoveIt!界面
        moveit_commander.roscpp_initialize(sys.argv)

        # 创建机械臂的MoveGroupCommander对象
        self.arm_group = moveit_commander.MoveGroupCommander("manipulator")

    def move_to_target(self, target_joint_angles):
        # 设置机械臂目标关节角度
        self.arm_group.set_joint_value_target(target_joint_angles)

        # 规划和执行运动
        self.arm_group.go(wait=True)

    def shutdown(self):
        # 关闭MoveIt!界面
        moveit_commander.roscpp_shutdown()

if __name__ == "__main__":
    try:
        # 创建UR5Controller对象
        ur5_controller = UR5Controller()

        # 指定目标关节角度
        target_joint_angles = [0.0, -1.57, 0.0, -1.57, 0.0, 0.0]  # 请替换为您的目标关节角度

        # 移动机械臂到目标位置
        ur5_controller.move_to_target(target_joint_angles)

        # 关闭MoveIt!界面
        ur5_controller.shutdown()

    except rospy.ROSInterruptException:
        pass

publisher

In [None]:
#!/usr/bin/env python3
import rospy
from std_msgs.msg import Float64MultiArray

if __name__ == "__main__":
    # 初始化ROS节点
    rospy.init_node('target_angles_publisher', anonymous=True)

    # 创建ROS发布者，发布目标关节角度信息到 "target_joint_angles" 主题
    target_angles_pub = rospy.Publisher("target_joint_angles", Float64MultiArray, queue_size=10)

    # 创建一个消息对象
    target_joint_angles = Float64MultiArray()
    target_joint_angles.data = [0.0, -1.57, 0.0, -1.57, 0.0, 0.0]  # 请替换为您希望的目标关节角度

    rate = rospy.Rate(10)  # 设置发布频率为10Hz

    while not rospy.is_shutdown():
        # 发布目标关节角度信息
        target_angles_pub.publish(target_joint_angles)
        rate.sleep()


In [None]:
import cv2
import mediapipe as mp
from util.models import myUR5

import cv2
import mediapipe as mp
import pyrealsense2 as rs
import pandas as pd
import numpy as np
import os
import time
from datetime import datetime
import warnings
from util.func import *
warnings.filterwarnings('ignore')

def angle(A, B=None, C=None):
    """计算两个三维向量之间的角度，可以接受两种不同的参数格式。"""
    if not B and not C:
        dis1, dis2, dis3 = np.array(A[0]), np.array(A[1]), np.array(A[2])
    else:
        dis1, dis2, dis3 = np.array(A), np.array(B), np.array(C)
    cos_theta = (np.linalg.norm(dis2 - dis3)**2 + np.linalg.norm(dis1 - dis3)**2 - np.linalg.norm(dis1 - dis2)**2) / (2 * np.linalg.norm(dis2 - dis3) * np.linalg.norm(dis1 - dis3))
    return np.arccos(cos_theta)

def draw_hand(image, hand_landmarks):
    mp.solutions.drawing_utils.draw_landmarks(
        image,
        hand_landmarks,
        mp_holistic.HAND_CONNECTIONS,
        mp_drawing_styles.get_default_hand_landmarks_style(),
        mp_drawing_styles.get_default_hand_connections_style())
def draw_face(image, face_landmarks):
    mp.solutions.drawing_utils.draw_landmarks(
        image,
        face_landmarks,
        mp_holistic.FACEMESH_CONTOURS,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp_drawing_styles
        .get_default_face_mesh_contours_style())
def draw_pose(image, pose_landmarks):
    mp.solutions.drawing_utils.draw_landmarks(
        image,
        pose_landmarks,
        mp_holistic.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles
        .get_default_pose_landmarks_style())

def position_mapping(joint_angles):
    """
    input: 3 dof list formatting input of angle data of left arm, 
        for shoulder, elbow and wrist
    output: 6 dof list formatting output of angle data for ur5e robot arm, 
        respectively for shoulder_pan_joint, shoulder_lift_joint, elbow_joint, 
        wrist_1_joint, wrist_2_joint and wrist_3_joint
    """

    # joint_angles = [(i-3.14) for i in joint_angles]
    # joint_angles = [i for i in joint_angles]
    joint_angles[0] = joint_angles[0]*(3.14/2-.1)/3.14-3.14/2+.05
    joint_angles[1] -= 3.14/2
    joint_angles[2] = ((joint_angles[2] - 0) * (3.14+.05 - (3.14*3/2-.05)) / (3.14 - 0)) + 3.14*3/2
    joint_angles[3] = (joint_angles[3] - 3.14) * 2
    mapped_joint_angles = [0]+joint_angles+[0]
    return mapped_joint_angles
    

_,_,pipeline = rs_initialize()
check_dirs()


mp_drawing_styles = mp.solutions.drawing_styles
mp_holistic = mp.solutions.holistic

# For webcam input:
cap = cv2.VideoCapture(0)

# Initialize video file, DataFrame and related flags
out = None
df_list = []
recording = False
start_detected = False
end_detected = False
end_gesture_detected_time = None

with mp_holistic.Holistic(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as holistic:
    arm = myUR5()  # Initializing the robot
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            print("Ignoring empty camera frame.")
            # If loading a video, use 'break' instead of 'continue'.
            continue

        # To improve performance, optionally mark the image as not writeable to
        # pass by reference.
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = holistic.process(image)

        # Draw landmark annotation on the image.
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        draw_face(image, results.face_landmarks)
        draw_pose(image, results.pose_landmarks)
        draw_hand(image, results.right_hand_landmarks)
        draw_hand(image, results.left_hand_landmarks)
        # Flip the image horizontally for a selfie-view display.
        cv2.imshow('MediaPipe Holistic', cv2.flip(image, 1))
        if cv2.waitKey(5) & 0xFF == 27:
            break

        # print(f"The landmarks' condition is {results.pose_landmarks.landmark}")

        try:
            vis = True
            landmarks = [results.pose_landmarks.landmark[23], # bocy
                         results.pose_landmarks.landmark[11], # shoulder
                         results.pose_landmarks.landmark[13], # elbow
                         results.left_hand_landmarks.landmark[0], # wrist
                         results.left_hand_landmarks.landmark[5], # mid_hand 1
                         results.left_hand_landmarks.landmark[9], # mid_hand 2
                         results.left_hand_landmarks.landmark[13], # mid_hand 3
                         results.left_hand_landmarks.landmark[17], # mid_hand 4
                         results.left_hand_landmarks.landmark[8], # top_hand 1
                         results.left_hand_landmarks.landmark[12], # top_hand 2
                         results.left_hand_landmarks.landmark[16], # top_hand 3
                         results.left_hand_landmarks.landmark[20]] # top_hand 4
            for landmarki in landmarks[0:3]:
                if landmarki.visibility < .8:
                    vis = False
                    break
            if not vis:
                continue

            extracted = [np.array([item.x, item.y, item.z]) for item in landmarks]
            ind_ang = {'shoulder': [0,1,2], 'elbow': [1,2,3], 'wrist': [2,3,4], 'finger': [3,4,5]}
            hand_pos = (extracted[4] + extracted[5] + extracted[6] + extracted[7])/4
            extracted[4] = hand_pos
            del extracted[5:8]
            finger_pos = (extracted[5] + extracted[6] + extracted[7] + extracted[8])/4
            extracted[5] = finger_pos
            del extracted[6:]

            angles = dict()
            for key,value in ind_ang.items():
                pos = [extracted[i] for i in value]
                angle_i = angle(pos)
                angles[key] = angle_i
            
            current_pos = position_mapping([angles['shoulder'],angles['elbow'],angles['wrist'],angles['finger']])
            arm.set_pos(current_pos)
            print(f"Position is set at{current_pos}")
        except:
            continue


cap.release()

In [2]:
import numpy as np
import cv2
import pyrealsense2 as rs

def main():
    # 配置深度和颜色流
    config = rs.config()
    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

    # 开启realsense相机流
    pipeline = rs.pipeline()
    profile = pipeline.start(config)

    try:
        while True:
            # 等待一帧数据并获取其
            frames = pipeline.wait_for_frames()
            depth_frame = frames.get_depth_frame()
            color_frame = frames.get_color_frame()

            # 若没有获取到深度或颜色数据，则跳过此次循环
            if not depth_frame or not color_frame:
                continue

            # 转换深度帧为numpy array并调整颜色
            depth_image = np.asanyarray(depth_frame.get_data())
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)

            # 转换颜色帧为numpy array
            color_image = np.asanyarray(color_frame.get_data())

            # 拼接并显示图像
            images = np.hstack((color_image, depth_colormap))
            cv2.imshow('Realsense', images)

            # 按下'q'键退出循环
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()
