diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index 21b5298..cdfa3dd 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -1,10 +1,34 @@ -# Library to handle calculations for inverse and forward dynamics + +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import pinocchio as pin import numpy as np import math from typing import Union from pathlib import Path +from numpy.linalg import norm, solve +from optik import Robot, SolverConfig +import tempfile +from ikpy import chain +import os + +# TODO : If a workspace calculation is already done, store the results in a file to avoid recalculating it and make only the +# verification of the payload handling in the workspace. + + class TorqueCalculator: def __init__(self, robot_description : Union[str, Path]): @@ -18,10 +42,59 @@ def __init__(self, robot_description : Union[str, Path]): # Load the robot model from path or XML string if isinstance(robot_description, str): self.model = pin.buildModelFromXML(robot_description) + + # TODO change parser in general for more unique solution + + # create temporary URDF file from the robot description string + with tempfile.NamedTemporaryFile(mode='w', suffix='.urdf', delete=False) as temp_file: + temp_file.write(robot_description) + temp_urdf_path = temp_file.name + + self.geom_model = pin.buildGeomFromUrdf(self.model,temp_urdf_path,pin.GeometryType.COLLISION) + + # Add collisition pairs + self.geom_model.addAllCollisionPairs() + + os.unlink(temp_urdf_path) + elif isinstance(robot_description, Path): self.model = pin.buildModelFromUrdf(str(robot_description.resolve())) + # create data for the robot model self.data = self.model.createData() + self.geom_data = pin.GeometryData(self.geom_model) + + # get the default collisions in the robot model to avoid take them into account in the computations + self.default_collisions = self.compute_static_collisions() + + # create array to store all possible configurations for left and right arms + self.configurations_l = None + self.configurations_r = None + + + def compute_static_collisions(self): + """ + Compute the static collisions for the robot model. + This method is used to compute the collisions in the robot model in the zero configuration. + """ + + # array to store the collision pairs + collision_pairs = [] + + # Compute all the collisions + pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, self.get_zero_configuration(), False) + + # Print the status of collision for all collision pairs + for k in range(len(self.geom_model.collisionPairs)): + cr = self.geom_data.collisionResults[k] + cp = self.geom_model.collisionPairs[k] + if cr.isCollision(): + print(f"Collision between {cp.first} and {cp.second} detected.") + collision_pairs.append((cp.first, cp.second, k)) + + return collision_pairs + + def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray: @@ -30,7 +103,8 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n :param q: Joint configuration vector. :param qdot: Joint velocity vector. - :param a: Joint acceleration vector. + :param qddot: Joint acceleration vector. + :param extForce: External forces vector applied to the robot model. :return: Torques vector """ @@ -47,17 +121,19 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n return tau - def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np.ndarray[pin.Force]: + def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]: """ - Create external forces vector based on the mass and frame ID. + Create external forces vector based on the masses and frame ID. The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint. - :param mass: Mass of the object to apply the force to. - :param frame_id: Frame ID where the force is applied. + :param masses (float, np.ndarray) : Mass of the object to apply the force to or vector with masses related to frames names. + :param frame_name(str , np.ndarray) : Frame name where the force is applied or vector of frame names where the forces is applied. + :param q: Joint configuration vector. :return: External force vector. """ - if mass < 0: - raise ValueError("Mass must be a positive value") + if isinstance(masses, float): + if masses < 0: + raise ValueError("Mass must be a positive value") if frame_name is None: raise ValueError("Frame name must be provided") @@ -70,24 +146,38 @@ def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np # Initialize external forces array fext = [pin.Force(np.zeros(6)) for _ in range(self.model.njoints)] - self.update_configuration(q) + self.update_configuration(q) - # Get the frame ID and joint ID from the frame name - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint + # Check if frame_name is a single string or an array of strings + if isinstance(frame_name, str): + # Get the frame ID and joint ID from the frame name + frame_id = self.model.getFrameId(frame_name) + joint_id = self.model.frames[frame_id].parentJoint - # force expressed in the world frame (gravity force in z axis) - ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) + # force expressed in the world frame (gravity force in z axis) + ext_force_world = pin.Force(np.array([0.0, 0.0, masses * -9.81]), np.array([0.0, 0.0, 0.0])) + + # placement of the frame in the world frame + #frame_placement = self.data.oMf[frame_id] + #print(f"Frame placement: {frame_placement}") + + # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied + fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) + # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied + fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient + + else: + for mass, frame in zip(masses,frame_name): + frame_id = self.model.getFrameId(frame) + joint_id = self.model.frames[frame_id].parentJoint + + # force expressed in the world frame (gravity force in z axis) + ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) + # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied + fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) + # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied + fext[joint_id].angular = np.zeros(3) - # placement of the frame in the world frame - frame_placement = self.data.oMf[frame_id] - #print(f"Frame placement: {frame_placement}") - - # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied - fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) - # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied - fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient - return fext @@ -101,74 +191,6 @@ def update_configuration(self, q : np.ndarray): pin.updateFramePlacements(self.model, self.data) - def get_mass_matrix(self, q : np.ndarray) -> np.ndarray: - """ - Compute the mass matrix for the robot model. - - :param q: Joint configuration vector. - :return: Mass matrix. - """ - - mass_matrix = pin.crba(self.model, self.data, q) - if mass_matrix is None: - raise ValueError("Failed to compute mass matrix") - - return mass_matrix - - def get_joints(self) -> np.ndarray: - """ - Get the array joint names of the robot model. - - :return: array Joint names . - """ - - return np.array(self.model.names[1:], dtype=str) - - def get_frames(self) -> np.ndarray: - """ - Get the array of frame names in the robot model. - - :return: array of frame names. - """ - - return np.array([frame.name for frame in self.model.frames if frame.type == pin.FrameType.BODY], dtype=str) - - def get_active_frames(self) -> np.ndarray: - """ - Get the array of active joint names in the robot model. - - :return: array of active joint names. - """ - # Get frames where joints are parents - frame_names = [] - for i in range(1, self.model.njoints): - for frame in self.model.frames: - if frame.parentJoint == i and frame.type == pin.FrameType.BODY: - frame_names.append(frame.name) - break - - return np.array(frame_names, dtype=str) - - def get_parent_joint_id(self, frame_name : str) -> int: - """ - Get the parent joint ID for a given frame name. - - :param frame_name: Name of the frame. - :return: Joint ID. - """ - - if frame_name is None: - raise ValueError("Frame name must be provided") - - # Get the frame ID from the model - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint - - if joint_id == -1: - raise ValueError(f"Joint '{joint_id}' not found in the robot model") - - return joint_id - def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, frame_name : str) -> float: """ Compute the forward dynamics acceleration vector. @@ -179,6 +201,7 @@ def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.nd :param frame_name: Name of the frame where the force is applied. :return: Acceleration vector. """ + # TODO : refactor this method # Update the configuration of the robot model self.update_configuration(q) @@ -207,6 +230,255 @@ def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.nd return F_max[2] # get the force in z axis of the world frame, which is the maximum force payload + + def compute_inverse_kinematics_optik(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray: + """ + Compute the inverse kinematics for the robot model using the Optik library. + + :param q: current joint configuration vector. + :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. + :return: Joint configuration vector that achieves the desired end effector position. + """ + # TODO : It doees not work with the current version of the library + + # Compute the inverse kinematics + sol = self.ik_model.ik(self.ik_config, end_effector_position, q) + + return sol + + + + def compute_inverse_kinematics_ikpy(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray: + """ + Compute the inverse kinematics for the robot model using the ikpy library. + + :param q: current joint configuration vector. + :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. + :return: Joint configuration vector that achieves the desired end effector position. + """ + # TODO : It doees not work with the current version of the library + + # Compute the inverse kinematics + sol = self.ik_model.inverse_kinematics(end_effector_position) + + return sol + + + + def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.ndarray, end_effector_name : str) -> np.ndarray: + """ + Compute the inverse kinematics for the robot model with joint limits consideration. + :param q: current joint configuration vector. + :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. + :param end_effector_name: Name of the end effector joint. + :return: Joint configuration vector that achieves the desired end effector position. + """ + + joint_id = self.model.getJointId(end_effector_name) # Get the joint ID of the end effector + + # Set parameters for the inverse kinematics solver + eps = 1e-3 # reduce for more precision + IT_MAX = 500 # Maximum number of iterations + DT = 1e-1 + damp = 1e-12 + + i = 0 + while True: + self.update_configuration(q) + iMd = self.data.oMi[joint_id].actInv(end_effector_position) # Get the transformation from the current end effector pose to the desired pose + + err = pin.log(iMd).vector # compute the error in the end effector position + if norm(err[:3]) < eps: + success = True + break + if i >= IT_MAX: + success = False + break + + J = pin.computeJointJacobian(self.model, self.data, q, joint_id) # compute the Jacobian of current pose of end effector + #J = -np.dot(pin.Jlog6(iMd.inverse()), J) + J = -J[:3, :] + + # compute the inverse kinematics v = -J^T * (J * J^T + damp * I)^-1 * err + v = -J.T.dot(solve(J.dot(J.T) + damp * np.eye(3), err[:3])) + + # Apply joint limits by clamping the resulting configuration + q_new = pin.integrate(self.model, q, v * DT) + # Ensure the new configuration is within joint limits + q_new = np.clip(q_new, self.model.lowerPositionLimit, self.model.upperPositionLimit) + + # Check if we're hitting joint limits and reduce step size if needed + if np.allclose(q_new, q, atol=1e-6): + DT *= 0.5 # Reduce step size if no progress due to joint limits + if DT < 1e-6: # Minimum step size threshold + success = False + break + + q = q_new + + # if not i % 10: + # print(f"{i}: error = {err.T}") + i += 1 + + if success: + print(f"Convergence achieved! in {i} iterations") + return q + else: + print( + "\n" + "Warning: the iterative algorithm has not reached convergence to the desired precision" + ) + return None # Return None if convergence is not achieved + + + + def compute_all_configurations(self, range : int, resolution : int, end_effector_name : str) -> np.ndarray: + """ + Compute all configurations for the robot model within a specified range. + + :param range (int): Range as side of a square where in the center there is the actual position of end effector. + :param resolution (int): Resolution of the grid to compute configurations. + :param end_effector_name (str): Name of the end effector joint. + :return : Array of joint configurations that achieve the desired end effector position. + """ + + if range <= 0: + raise ValueError("Range must be a positive value") + + # Get the current joint configuration + q = self.get_zero_configuration() + + #id_end_effector = self.model.getJointId(end_effector_name) + # Get the current position of the end effector + #end_effector_pos = self.data.oMi[id_end_effector] + + # Create an array to store all configurations + configurations = [] + + # Iterate over the range to compute all configurations + for x in np.arange(-range/2, range/2 , resolution): + for y in np.arange(-range/2, range/2 , resolution): + for z in np.arange(0, range , resolution): + target_position = pin.SE3(np.eye(3), np.array([x, y, z])) + new_q = self.compute_inverse_kinematics(q, target_position, end_effector_name) + + if new_q is not None: + q = new_q + # store the valid configuration and the position of the end effector relative to that configuration + configurations.append({"config" : new_q, "end_effector_pos": target_position.translation}) + + return np.array(configurations, dtype=object) + + + + def verify_configurations(self, configurations_left : np.ndarray, configurations_right : np.ndarray, masses : np.ndarray, checked_frames : np.ndarray) -> np.ndarray: + """ + Verify the configurations to check if they are valid. + + :param configurations_left: Array of joint configurations to verify for the left arm. + :param configurations_right: Array of joint configurations to verify for the right arm. + :param ext_forces: Array of external forces to apply to the robot model. + :return: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}]. + """ + + valid_configurations = [] + + # check valid configurations for left arm + for q in configurations_left: + # Update the configuration of the robot model + self.update_configuration(q["config"]) + + if masses is not None and checked_frames is not None: + # Create external forces based on the masses and checked frames + ext_forces = self.create_ext_force(masses, checked_frames, q["config"]) + # Compute the inverse dynamics for the current configuration + tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) + else: + # Compute the inverse dynamics for the current configuration without external forces + tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration()) + + # Check if the torques are within the effort limits + if self.check_effort_limits(tau,"left").all(): + valid = True + # Compute all the collisions + pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False) + + # Print the status of collision for all collision pairs + for k in range(len(self.geom_model.collisionPairs)): + cr = self.geom_data.collisionResults[k] + cp = self.geom_model.collisionPairs[k] + + if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions: + print(f"Collision detected between {cp.first} and {cp.second} in the left arm configuration.") + valid = False + break + + if valid: + valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "left" }) + + # check valid configurations for right arm + for q in configurations_right: + # Update the configuration of the robot model + self.update_configuration(q["config"]) + + if masses is not None and checked_frames is not None: + # Create external forces based on the masses and checked frames + ext_forces = self.create_ext_force(masses, checked_frames, q["config"]) + # Compute the inverse dynamics for the current configuration + tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) + else: + # Compute the inverse dynamics for the current configuration without external forces + tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration()) + + # Check if the torques are within the effort limits + if self.check_effort_limits(tau,"right").all(): + valid = True + # Compute all the collisions + pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False) + + # Print the status of collision for all collision pairs + for k in range(len(self.geom_model.collisionPairs)): + cr = self.geom_data.collisionResults[k] + cp = self.geom_model.collisionPairs[k] + + if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions: + print(f"Collision detected between {cp.first} and {cp.second} in the right arm configuration.") + valid = False + break + + if valid: + valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "right" }) + + + return np.array(valid_configurations, dtype=object) + + + def get_valid_workspace(self, range : int, resolution : int, end_effector_name_left : str, end_effector_name_right, masses : np.ndarray, checked_frames: np.ndarray) -> np.ndarray: + """ + Get the valid workspace of the robot model by computing all configurations within a specified range. + + :param range (int): Range as side of a square where in the center there is the actual position of end effector. + :param resolution (int): Resolution of the grid to compute configurations. + :param end_effector_name_left (str): Name of the end effector joint to test for the left arm. + :param end_effector_name_right (str): Name of the end effector joint to test for the right arm. + :param masses (np.ndarray): Array of masses to apply to the robot model. + :param checked_frames (np.ndarray): Array of frame names where the external forces are applied. + :return: Array of valid configurations that achieve the desired end effector position in format: [{"config", "end_effector_pos, "tau", "arm"}]. + """ + # compute all configurations for the left and right arms if they are not already computed + if self.configurations_l is None or self.configurations_r is None: + # Compute all configurations within the specified range for the left arm + self.configurations_l = self.compute_all_configurations(range,resolution, end_effector_name_left) + # Compute all configurations within the specified range for the right arm + self.configurations_r = self.compute_all_configurations(range,resolution, end_effector_name_right) + + # Verify the configurations to check if they are valid for both arms + valid_configurations = self.verify_configurations(self.configurations_l,self.configurations_r, masses, checked_frames) + + return valid_configurations + + + def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray: """ Compute the forward dynamics acceleration vector with Articulated-Body algorithm(ABA). @@ -214,6 +486,7 @@ def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, :param q: Joint configuration vector. :param qdot: Joint velocity vector. :param tau: Joint torque vector. + :param extForce: External forces vector applied to the robot model. :return: Acceleration vector. """ @@ -250,6 +523,102 @@ def compute_jacobian(self, q : np.ndarray, frame_name : str) -> np.ndarray: return J_frame + def get_maximum_torques(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray: + """ + Get the maximum torques for each joint in all valid configurations. + + :param valid_configs: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}]. + :return: Arrays of maximum torques for each joint in the current valid configurations for left and right arm. + """ + + # Get the number of joints + num_joints = len(valid_configs[0]["tau"]) + max_torques_left = np.array([], dtype=float) + max_torques_right = np.array([], dtype=float) + + # Find maximum absolute torque for each joint + for i in range(num_joints): + joint_torques_left = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "left"] + joint_torques_right = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "right"] + + max_torques_left = np.append( max_torques_left, max(joint_torques_left)) + max_torques_right = np.append( max_torques_right, max(joint_torques_right)) + + return max_torques_left, max_torques_right + + + + + def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray = None) -> np.ndarray: + """ + Normalize the torques vector to a unified scale. + + :param tau: Torques vector to normalize. + :return: Normalized torques vector. + """ + if tau is None: + raise ValueError("Torques vector is None") + + norm_tau = [] + + # if target_torque is not specified, normalize the torques vector to the effort limits of the robot model + if target_torque is None: + # Normalize the torques vector + for i, torque in enumerate(tau): + norm_tau.append(abs(torque) / self.model.effortLimit[i]) + else: + # Normalize the torques vector to the target torque + for i, torque in enumerate(tau): + if target_torque[i] != 0: + norm_tau.append(abs(torque) / target_torque[i]) + else: + norm_tau.append(0.0) + + + return norm_tau + + + def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray: + """ + Get a unified sum of torques for all possible configurations of the robot model. + + :param q: Joint configuration vector. + :param valid_configs: Array of + """ + torques_sum = np.array([], dtype=float) + norm_torques = np.array([], dtype=float) + sum = 0.0 + + for valid_config in valid_configs: + # get the joint configuration and torques vector from the valid configuration + q = valid_config["config"] + tau = valid_config["tau"] + + # calculate the sum of torques for each joint configuration + for joint, torque in zip(q, tau): + #if abs(torque) < 50: + sum += abs(torque) + + torques_sum = np.append(torques_sum, {"sum" : sum, "end_effector_pos" : valid_config["end_effector_pos"], "arm" : valid_config["arm"]}) + sum = 0.0 # reset the sum for the next configuration + + # get the maximum torque from the sum of torques for both arms + max_torque_l = max([item["sum"] for item in torques_sum if item["arm"] == "left"]) + max_torque_r = max([item["sum"] for item in torques_sum if item["arm"] == "right"]) + + # Normalize the torques vector to a unified scale + for tau in torques_sum: + if tau["arm"] == "left": + norm_tau = tau["sum"] / max_torque_l + else: + norm_tau = tau["sum"] / max_torque_r + + # append the normalized torque to the array + norm_torques = np.append(norm_torques, {"norm_tau" : norm_tau, "end_effector_pos" : tau["end_effector_pos"], "arm" : tau["arm"]}) + + return norm_torques + + def check_zero(self, vec : np.ndarray) -> bool: """ Checks if the vector is zero. @@ -303,7 +672,7 @@ def get_zero_acceleration(self) -> np.ndarray: def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]: """ Get a random configuration for configuration and velocity vectors. - :return: Random configuration vector. + :return: Random configuration vectors. """ q_limits_lower = self.model.lowerPositionLimit q_limits_upper = self.model.upperPositionLimit @@ -320,40 +689,83 @@ def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]: raise ValueError("Failed to get random velocity") return q, qdot + - - def check_effort_limits(self, tau : np.ndarray) -> np.ndarray: + def check_joint_limits(self, q : np.ndarray) -> np.ndarray: """ - Check if the torques vector is within the effort limits of the robot model. + Check if the joint configuration vector is within the joint limits of the robot model. - :param tau: Torques vector to check. - :return: Array of booleans indicating if each joint is within the effort limits. + :param q: Joint configuration vector to check. + :return: Array of booleans indicating if each joint is within the limits. """ - if tau is None: - raise ValueError("Torques vector is None") + if q is None: + raise ValueError("Joint configuration vector is None") # array to store if the joint is within the limits within_limits = np.zeros(self.model.njoints - 1, dtype=bool) - # Check if the torques are within the limits - for i in range(self.model.njoints -1): - if abs(tau[i]) > self.model.effortLimit[i]: - print(f"\033[91mJoint {i+2} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n") + # Check if the joint configuration is within the limits + for i in range(self.model.njoints - 1): + if q[i] < self.model.lowerPositionLimit[i] or q[i] > self.model.upperPositionLimit[i]: within_limits[i] = False else: within_limits[i] = True - print("All joints are within effort limits. \n") + return within_limits + + + def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray: + """ + Check if the torques vector is within the effort limits of the robot model. + :param tau: Torques vector to check. + :param arm: Arm name to filter the torques vector, right or left. + :return: Array of booleans indicating if each joint is within the effort limits. + """ + if tau is None: + raise ValueError("Torques vector is None") + + # array to store if the joint is within the limits + within_limits = np.array([], dtype=bool) + + # if arm is not specified, check all joints + if arm is None: + # Check if the torques are within the limits + for i in range(self.model.njoints -1): + if abs(tau[i]) > self.model.effortLimit[i]: + print(f"\033[91mJoint {i+2} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n") + within_limits = np.append(within_limits, False) + else: + within_limits = np.append(within_limits, True) + + if np.all(within_limits): + print("All joints are within effort limits. \n") + + else: + # Check if the torques are within the limits + for i in range(self.model.njoints -1): + # TODO arm is "left" or right" but the model has gripper with left and right in the same name + if arm in self.model.names[i+1]: + if abs(tau[i]) > self.model.effortLimit[i]: + print(f"\033[91mJoint {i+1} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n") + within_limits = np.append(within_limits, False) + else: + within_limits = np.append(within_limits, True) + + if np.all(within_limits): + print("All joints are within effort limits. \n") + + return within_limits def set_position(self, pos_joints : list[float], name_positions : list[str] ) -> np.ndarray: """ - Set the joint positions of the robot model. + Convert joint positions provided by jointstate publisher to the right format of the robot model. - :param q: Joint configuration vector to set provided by joint state topic. + :param pos_joints: List of joint positions provided by jointstate publisher. + :param name_positions: List of joint names in the order provided by jointstate publisher. """ q = np.zeros(self.model.nq) @@ -377,6 +789,152 @@ def set_position(self, pos_joints : list[float], name_positions : list[str] ) -> self.update_configuration(q) return q + + + def get_position_for_joint_states(self, q : np.ndarray) -> np.ndarray: + """ + Convert configuration in pinocchio format to right format of joint states publisher + Example: continuous joints (wheels) are represented as two values in the configuration vector but + in the joint state publisher they are represented as one value (angle). + + :param q : Joint configuration provided by pinocchio library + :return: Joint positions in the format of joint state publisher. + """ + + config = [] + + current_selected_config = q + + # Use this method to get the single values for joints, for example continuous joints (wheels) are represented as two values in the configuration vector but + # in the joint state publisher they are represented as one value (angle) + # so we need to convert the configuration vector to the right format for joint state publisher + cont = 0 + for i in range(1, self.model.njoints): + if self.model.joints[i].nq == 2: + # for continuous joints (wheels) + config.append({ "q" : math.atan2(current_selected_config[cont+1], current_selected_config[cont]), "joint_name" : self.model.names[i] }) + + elif self.model.joints[i].nq == 1: + # for revolute joints + config.append({"q" : current_selected_config[cont], "joint_name" : self.model.names[i]}) + + cont += self.model.joints[i].nq + + return config + + + def get_joints_placements(self, q : np.ndarray) -> np.ndarray | float: + """ + Get the placements of the joints in the robot model. + + :param q: Joint configuration vector. + :return: Array of joint placements with names of joint, and z offset of base link. + """ + base_link_id = self.model.getFrameId("base_link") + offset_z = self.data.oMf[base_link_id].translation[2] # Get the z offset of the base link + + self.update_configuration(q) + placements = np.array([({"name" : self.model.names[i], + "type" : self.model.joints[i].shortname() , + "x": self.data.oMi[i].translation[0], + "y": self.data.oMi[i].translation[1], + "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) + + return placements, offset_z + + + def get_joint_placement(self, joint_id : int) -> dict: + """ + Get the placement of a specific joint in the robot model. + + :param joint_id: ID of the joint to get the placement for. + :return: Dictionary with coordinates x , y , z of the joint. + """ + + if joint_id < 0 or joint_id >= self.model.njoints: + raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.") + + placement = self.data.oMi[joint_id].translation + + return {"x" : placement[0], "y": placement[1], "z": placement[2]} + + + def get_mass_matrix(self, q : np.ndarray) -> np.ndarray: + """ + Compute the mass matrix for the robot model. + + :param q: Joint configuration vector. + :return: Mass matrix. + """ + + mass_matrix = pin.crba(self.model, self.data, q) + if mass_matrix is None: + raise ValueError("Failed to compute mass matrix") + + return mass_matrix + + + + def get_joints(self) -> np.ndarray: + """ + Get the array joint names of the robot model. + + :return: array Joint names . + """ + + return np.array(self.model.names[1:], dtype=str) + + + + def get_frames(self) -> np.ndarray: + """ + Get the array of frame names in the robot model. + + :return: array of frame names. + """ + + return np.array([frame.name for frame in self.model.frames if frame.type == pin.FrameType.BODY], dtype=str) + + + + def get_active_frames(self) -> np.ndarray: + """ + Get the array of active joint names in the robot model. + + :return: array of active joint names. + """ + # Get frames where joints are parents + frame_names = [] + for i in range(1, self.model.njoints): + for frame in self.model.frames: + if frame.parentJoint == i and frame.type == pin.FrameType.BODY: + frame_names.append(frame.name) + break + + return np.array(frame_names, dtype=str) + + + + def get_parent_joint_id(self, frame_name : str) -> int: + """ + Get the parent joint ID for a given frame name. + + :param frame_name: Name of the frame. + :return: Joint ID. + """ + + if frame_name is None: + raise ValueError("Frame name must be provided") + + # Get the frame ID from the model + frame_id = self.model.getFrameId(frame_name) + joint_id = self.model.frames[frame_id].parentJoint + + if joint_id == -1: + raise ValueError(f"Joint '{joint_id}' not found in the robot model") + + return joint_id + def print_configuration(self, q : np.ndarray = None): """ @@ -390,19 +948,6 @@ def print_configuration(self, q : np.ndarray = None): print(f"Frame: {frame.name}") print(f" Rotation:\n{placement.rotation}") print(f" Translation:\n{placement.translation}") - - def get_joints_placements(self, q : np.ndarray) -> np.ndarray: - """ - Get the placements of the joints in the robot model. - - :param q: Joint configuration vector. - :return: Array of joint placements with names of joint. - """ - - self.update_configuration(q) - placements = np.array([({"name" : self.model.names[i], "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) - - return placements def print_torques(self, tau : np.ndarray): @@ -413,15 +958,20 @@ def print_torques(self, tau : np.ndarray): """ if tau is None: raise ValueError("Torques vector is None") - + print("Torques vector:") for i, torque in enumerate(tau): - # TODO Extract type of joint in order to print right measure unit ([N] or [Nm]) - print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm") - + # check if the joint is a prismatic joint + if self.model.joints[i+1].shortname() == "JointModelPZ": + print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} N") + + # for revolute joints + else: + print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm") print("\n") + def print_frames(self): """ Print the frames of the robot model. diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml index 9b6674c..3b5dfae 100644 --- a/dynamic_payload_analysis_core/package.xml +++ b/dynamic_payload_analysis_core/package.xml @@ -5,7 +5,7 @@ 0.0.0 Core package with calculation for torques calculator morolinux - TODO: License declaration + Apache License 2.0 ament_copyright ament_flake8 diff --git a/dynamic_payload_analysis_core/setup.py b/dynamic_payload_analysis_core/setup.py index 1b87706..70e2a59 100644 --- a/dynamic_payload_analysis_core/setup.py +++ b/dynamic_payload_analysis_core/setup.py @@ -13,10 +13,10 @@ ], install_requires=['setuptools'], zip_safe=True, - maintainer='morolinux', + maintainer='Enrico Moro', maintainer_email='enrimoro003@gmail.com', - description='TODO: Package description', - license='TODO: License declaration', + description='This package implements core functionalities for dynamic payload analysis in robotics, focusing on torque calculations and external force handling.', + license='Apache License 2.0', tests_require=['pytest'], entry_points={ 'console_scripts': [ diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py new file mode 100644 index 0000000..571703b --- /dev/null +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import numpy as np + +from interactive_markers import InteractiveMarkerServer +from interactive_markers import MenuHandler +import rclpy +from visualization_msgs.msg import InteractiveMarker +from visualization_msgs.msg import InteractiveMarkerControl +from visualization_msgs.msg import Marker +from enum import Enum + +# enumeration to define the type of torque color visualization +class TorqueVisualizationType(Enum): + TORQUE_LIMITS = 1 + MAX_CURRENT_TORQUE = 2 + +class MenuPayload(): + def __init__(self, node): + # create server for interactive markers + self.server = InteractiveMarkerServer(node, 'menu_frames') + + # array to store the checked orunchecked frames and payload selection + self.frames_selection = np.array([],dtype=object) + + # create handler for menu + self.menu_handler = MenuHandler() + + #current managed frame + self.current_frame = None + + # flag to compute workspace + self.compute_workspace = False + + # variable to store the selected configuration to display in Rviz + self.selected_configuration = None + + # payload mass selection array + self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float) + + # variable to store selection between torque limits or max torque in the current configuration for color visualization + self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # default is torque limits + + # insert the root menu items + self.root_frames = self.menu_handler.insert('Select where to apply payload') + self.workspace_button = self.menu_handler.insert('Compute workspace', callback=self.callback_workspace) + # insert the reset payload button + self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset) + + # insert the checker for visualization color choice between torque limits or max torque in the current configuration + self.torque_limits_checker = self.menu_handler.insert('Torque limits',command=str(TorqueVisualizationType.TORQUE_LIMITS.value) , callback=self.callback_color_selection) + self.max_torque_checker = self.menu_handler.insert('Max torque', command=str(TorqueVisualizationType.MAX_CURRENT_TORQUE.value) , callback=self.callback_color_selection) + + self.menu_handler.setVisible(self.torque_limits_checker, False) + self.menu_handler.setVisible(self.max_torque_checker, False) + + + self.make_menu_marker('menu_frames') + # add server to menu and apply changes + self.menu_handler.apply(self.server, 'menu_frames') + self.server.applyChanges() + + + def insert_frame(self, name): + """ + Insert a new item(frame) in the checkbox menu of frames. + + Args: + name (str) : name of the frame + """ + last_item = self.menu_handler.insert(name, parent=self.root_frames, callback=self.callback_selection) + self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_item, True) + + # add the item to the checked frames array in order to keep track of the checked items + self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} ) + + # apply changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + def callback_color_selection(self, feedback): + """ + Callback for torque selection type to change the visualization color in Rviz. + + Args: + feedback: Feedback from the menu selection. + """ + # get the handle of the selected item (id) + handle = feedback.menu_entry_id + # get the title of the selected item + title = self.menu_handler.getTitle(handle) + # get the entry object from the menu handler with all the informations about the item + color_context = self.menu_handler.entry_contexts_[handle] + + # get the command field, in this case it is used to store if the selected item is a torque limits or max torque + self.torque_color = TorqueVisualizationType.TORQUE_LIMITS if color_context.command == str(TorqueVisualizationType.TORQUE_LIMITS.value) else TorqueVisualizationType.MAX_CURRENT_TORQUE + + if self.torque_color == TorqueVisualizationType.TORQUE_LIMITS: + # set the torque limits checker as checked and max torque checker as unchecked + self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED) + self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED) + elif self.torque_color == TorqueVisualizationType.MAX_CURRENT_TORQUE: + # set the max torque checker as checked and torque limits checker as unchecked + self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.CHECKED) + self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.UNCHECKED) + + + # apply changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def callback_workspace(self, feedback): + """ + Callback to compute the workspace based on the selected frames and payloads. + This function is called when the user clicks on the "Compute workspace" button. + """ + self.compute_workspace = True + + + def insert_dropdown_configuration(self, configuration : np.ndarray): + """ + Insert to the dropdown the possible configurations. + + Args: + configuration (np.ndarray): Array of configuration to be displayed in the dropdown. + """ + + for i, item in enumerate(configuration): + # insert the parent in the command field to keep track of the parent id + last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"], parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection) + self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_entry, True) + + # set visible true for color selection and put the default check state + self.menu_handler.setVisible(self.torque_limits_checker, True) + self.menu_handler.setVisible(self.max_torque_checker, True) + self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED) + self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED) + + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def callback_configuration_selection(self, feedback): + """ + Callback for configuration selection to change the check state of the selected item. + + Args: + feedback: Feedback from the menu selection. + """ + # get the handle of the selected item (id) + handle = feedback.menu_entry_id + # get the title of the selected item (it contains configuration number) + title = self.menu_handler.getTitle(handle) + + # set the checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + + # get the entry object from the menu handler with all the informations about the item (command field is used to store the parent id) + config_context = self.menu_handler.entry_contexts_[handle] + + # get the parent id of the selected configuration stored in the command field + parent_id = int(config_context.command) + # get the entry object of the parent + parent_context = self.menu_handler.entry_contexts_[parent_id] + + # reset all the selections in the configuration sub-menu + # check if a item is already checked, if so remove it and set to unchecked to prevent multiple configuration selection in the same menu + for item in parent_context.sub_entries: + # check if the item is checked + if self.menu_handler.getCheckState(item) == MenuHandler.CHECKED: + # if the configuration is already checked, we need to uncheck it + self.menu_handler.setCheckState(item, MenuHandler.UNCHECKED) + + # set the selected configuration checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + # update the selected configuration + self.selected_configuration = int(title.split()[1]) # Extract the configuration number from the title + + # apply changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def callback_reset(self, feedback): + """ + Callback for reset menu selection to remove all checked items from the menu. + """ + # remove all checked items from the array of payloads selection + for i, item in enumerate(self.frames_selection): + if item['checked']: + item = {"name": item['name'], "checked": False, "payload": 0.0} + self.frames_selection[i] = item + + # reset the frames selection menu (i = number of entry, item = object with name, sub entries, etc.) + for i, item in self.menu_handler.entry_contexts_.items(): + if i == 1: + # skip the root item (payload reset) + continue + + # check if the item(frame) has sub entries (payloads selection) + if item.sub_entries: + # if the frame has payloads selection, we need to remove it + for sub_item in item.sub_entries: + self.menu_handler.setVisible(sub_item, False) + self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED) + + # check if the item is the reset payloads or compute workspace item and skip the unchanging of the check state + if item.title == "Reset payloads" or item.title == "Compute workspace": + continue + + # set the checked of frame to unchecked + self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED) + + # reset the selected configuration + self.selected_configuration = None + + # hide the torque limits and max torque checkboxes when there is no configuration selected + self.menu_handler.setVisible(self.torque_limits_checker, False) + self.menu_handler.setVisible(self.max_torque_checker, False) + + self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits + + # reapply the menu handler and server changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def callback_selection(self, feedback): + """ + Callback for menu selection to change the check state of the selected item(frame). + + Args: + feedback: Feedback from the menu selection. + """ + # get name of the frame + handle = feedback.menu_entry_id + title = self.menu_handler.getTitle(handle) + + # add payloads selection as a submenu + self.manage_payload_menu(handle) + # update the frames_selection array to set the item as checked + self.update_item(title, True) + + # set the checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + + # update the menu state + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + # print the current state of the checked frames + print(f"{self.get_item_state()} \n") + + + + + def update_item(self, name, check: bool): + """ + Update the state of an item(frame) in the array of selected frames with payload. + + Args: + name (str): Name of the item(frame) to update. + check (bool): New state of the item. + """ + + for item in self.frames_selection: + if item['name'] == name: + item['checked'] = check + break + + + def update_payload(self, name, payload: float): + """ + Update the payload mass of an item in the array of selected frames with payload. + + Args: + name (str): Name of the item(frame) to update. + payload (float): New payload mass that act on the frame. + """ + + for item in self.frames_selection: + if item['name'] == name: + item['payload'] = payload + break + + + def manage_payload_menu(self, menu_entry_id): + """ + Add payload selection items as a sub-menu for the specified menu entry (selected frame). + + Args: + menu_entry_id : ID of the menu entry. + """ + # insert as a sub menu all the payload stored in the array + for payload in self.payload_selection: + # insert the payload mass selection items (command = str(menu_entry_id) is used to identify the parent menu entry) + last_entry = self.menu_handler.insert(f"{payload} kg", parent=menu_entry_id, command = str(menu_entry_id), callback=self.update_payload_selection) + self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_entry, True) + + # apply changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def update_payload_selection(self, feedback): + """ + Callback for payload selection to change the current payload. + + Args: + feedback: Feedback from the menu selection. + """ + # get the handle of the selected item (id) + handle = feedback.menu_entry_id + # get the title of the selected item (it contains number of kg) + title = self.menu_handler.getTitle(handle) + # get the entry object from the menu handler with all the informations about the item (command field is used to store the parent id) + payload_context = self.menu_handler.entry_contexts_[handle] + + # get the parent id of the selected payload stored in the command field + parent_id = int(payload_context.command) + # get the entry object of the parent + parent_context = self.menu_handler.entry_contexts_[parent_id] + # get the name of the parent item (frame name) + name_parent = self.menu_handler.getTitle(parent_id) + + # reset all the selections in the payload sub-menu + # check if a item is already checked, if so remove it and set to unchecked to prevent multiple payload selection in the same menu + for item in parent_context.sub_entries: + # check if the item is checked + if self.menu_handler.getCheckState(item) == MenuHandler.CHECKED: + # if the item is already checked, we need to uncheck it and set the payload to 0.0 + self.menu_handler.setCheckState(item, MenuHandler.UNCHECKED) + self.update_payload(name_parent, 0.0) + + # set the selected payload checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + # uppate the array with the new payload + self.update_payload(name_parent, float(title.split()[0])) # Extract the payload mass from the title + + # apply changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def get_torque_color(self) -> TorqueVisualizationType: + """ + Return the type of torque color visualization selected. + + Returns: + TorqueVisualizationType: The type of torque color visualization. + """ + return self.torque_color + + + def get_selected_configuration(self) -> str: + """ + Return the selected configuration to display in Rviz. + + Returns: + str: The selected configuration number. + """ + return self.selected_configuration + + def get_workspace_state(self) -> bool: + """ + Return the state of the workspace computation flag. + + Returns: + bool: True if the workspace should be computed, False otherwise. + """ + return self.compute_workspace + + + def set_workspace_state(self, state: bool): + """ + Set the state of the workspace computation flag. + + Args: + state (bool): True to compute workspace, False otherwise. + """ + self.compute_workspace = state + + + def get_item_state(self) -> np.ndarray: + """ + Return array of checked frames in the menu list + + Returns: + np.ndarray: array of checked frames + """ + return self.frames_selection + + + def make_label(self, msg): + """ + Create the label used to click over in rviz + """ + marker = Marker() + + marker.type = Marker.TEXT_VIEW_FACING + marker.text = "Click here to select frame" + marker.scale.x = msg.scale * 0.45 + marker.scale.y = msg.scale * 0.45 + marker.scale.z = msg.scale * 0.45 + marker.color.r = 0.1 + marker.color.g = 0.0 + marker.color.b = 0.5 + marker.color.a = 1.0 + + return marker + + def make_label_control(self, msg): + control = InteractiveMarkerControl() + control.always_visible = True + control.markers.append(self.make_label(msg)) + msg.controls.append(control) + + return control + + + def make_empty_marker(self, dummyBox=True): + """ + Create interactive marker + """ + int_marker = InteractiveMarker() + int_marker.header.frame_id = 'base_link' + int_marker.pose.position.z = 2.0 + int_marker.scale = 0.5 + + return int_marker + + + def make_menu_marker(self, name): + """ + Create interactive menu + """ + int_marker = self.make_empty_marker() + int_marker.name = name + + control = InteractiveMarkerControl() + + control.interaction_mode = InteractiveMarkerControl.BUTTON + control.always_visible = True + + control.markers.append(self.make_label(int_marker)) + int_marker.controls.append(control) + + self.server.insert(int_marker) + + + def shutdown(self): + """ + Shutdown the interactive marker server. + """ + self.server.shutdown() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py new file mode 100644 index 0000000..a6825dd --- /dev/null +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -0,0 +1,485 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String, ColorRGBA +from geometry_msgs.msg import WrenchStamped, Point +from sensor_msgs.msg import JointState +from dynamic_payload_analysis_core.core import TorqueCalculator +import numpy as np +from visualization_msgs.msg import Marker, MarkerArray +from dynamic_payload_analysis_ros.menu_visual import MenuPayload +from dynamic_payload_analysis_ros.menu_visual import TorqueVisualizationType + + + + + +class RobotDescriptionSubscriber(Node): + def __init__(self): + super().__init__('node_robot_description_subscriber') + self.subscription = self.create_subscription( + String, + '/robot_description', + self.robot_description_callback, + qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1) + ) + + # menu for selecting frames to apply payload + self.menu = MenuPayload(self) + + # Publisher for external force + self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10) + + # Publisher for RViz visualization of torques + self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10) + + # Pusblisher for point cloud workspace area + self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', 10) + + # Publisher for point names in the workspace area + self.publisher_workspace_area_names = self.create_publisher(MarkerArray, '/workspace_area_names', 10) + + # subscription to joint states + self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10) + + # publisher for the joint states + self.publisher_joint_states = self.create_publisher(JointState, '/joint_states', 10) + + # variable to store the object of the TorqueCalculator + self.robot = None + + # variable to store external force applied on the robot + self.external_force = None + + # variable to store checked frames in the menu where a payload is applied + self.checked_frames = [] + + # variable to store masses applied on the frames + self.masses = None + + # variable to store the currente selected configuration from the workspace menu + self.valid_configurations = None + + # variable to store if there is a selected configuration from the workspace menu to visualize + self.selected_configuration = None + + # timer to compute the valid workspace area + self.timer_workspace_calculation = self.create_timer(1, self.workspace_calculation) + # timer to publish the selected configuration in joint states + self.timer_publish_configuration = self.create_timer(2, self.publish_selected_configuration) + # timer to calculate the external forces based on the checked frames in the menu + self.timer_update_checked_frames = self.create_timer(0.5, self.update_checked_frames) + # timer to publish the external forces as arrows in RViz + self.timer_publish_force = self.create_timer(1, self.publish_payload_force) + + self.get_logger().info('Robot description subscriber node started') + + + def robot_description_callback(self, msg): + """ + Callback function for the robot description topic. + This function initializes the TorqueCalculator with the robot description received from the topic. + """ + + self.get_logger().info('Received robot description') + + self.robot = TorqueCalculator(robot_description = msg.data) + self.robot.print_active_joint() + self.robot.print_frames() + + # Add the frame to the menu for payload selection + for frame in self.robot.get_active_frames(): + self.menu.insert_frame(frame) + + # self.robot.print_configuration() + + + def publish_payload_force(self): + """ + Publish the gravity force on the frame with id `id_force`. + """ + external_force_array = MarkerArray() + + for frame in self.menu.get_item_state(): + + id_force = self.robot.get_parent_joint_id(frame["name"]) + joint_position = self.robot.get_joint_placement(id_force) + arrow_force = Marker() + + arrow_force.header.frame_id = "base_link" + arrow_force.header.stamp = self.get_clock().now().to_msg() + arrow_force.ns = "external_force" + arrow_force.id = id_force + arrow_force.type = Marker.ARROW + + # add the arrow if the frame is checked or delete it if not + if frame["checked"]: + arrow_force.action = Marker.ADD + else: + arrow_force.action = Marker.DELETE + + arrow_force.scale.x = 0.20 # Length of the arrow + arrow_force.scale.y = 0.05 # Width of the arrow + arrow_force.scale.z = 0.05 # Height of the arrow + arrow_force.color.a = 1.0 # Alpha + arrow_force.color.r = 0.0 + arrow_force.color.g = 0.0 # Green + arrow_force.color.b = 1.0 + + # Set the position of the arrow at the joint placement + arrow_force.pose.position.x = joint_position["x"] + arrow_force.pose.position.y = joint_position["y"] + arrow_force.pose.position.z = joint_position["z"] + # Set the direction of the arrow downwards + arrow_force.pose.orientation.x = 0.0 + arrow_force.pose.orientation.y = 0.7071 + arrow_force.pose.orientation.z = 0.0 + arrow_force.pose.orientation.w = 0.7071 + + external_force_array.markers.append(arrow_force) + + self.publisher_force.publish(external_force_array) + + + + def workspace_calculation(self): + """ + Callback for timer to compute the valid workspace area. + """ + # if the user choose to compute the workspace area then compute the valid configurations + if self.menu.get_workspace_state(): + self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "gripper_left_finger_joint", "gripper_right_finger_joint", self.masses, self.checked_frames) + + # insert the valid configurations in the menu + self.menu.insert_dropdown_configuration(self.valid_configurations) + + # clear all the workspace area markers + self.clear_workspace_area_markers() + + # set the workspace state to False to stop the computation + self.menu.set_workspace_state(False) + + # if there are valid configurations, publish the workspace area + if self.valid_configurations is not None: + # publish the workspace area + self.publish_workspace_area(self.valid_configurations) + + + + def publish_selected_configuration(self): + """ + Timer to publish the selected configuration. + This will publish the joint states of the selected configuration in the menu. + """ + # get the selected configuration from the menu + self.selected_configuration = self.menu.get_selected_configuration() + + # if there is a selected configuration, publish the joint states based on the valid configurations calculated previously + if self.selected_configuration is not None: + configs = self.robot.get_position_for_joint_states(self.valid_configurations[self.selected_configuration]["config"]) + joint_state = JointState() + joint_state.header.stamp = self.get_clock().now().to_msg() + + joint_state.name = [joint["joint_name"] for joint in configs] + joint_state.position = [joint["q"] for joint in configs] + #joint_state.position = + self.publisher_joint_states.publish(joint_state) + + else: + if self.robot is not None: + # if there is no selected configuration, publish the joint states with zero positions + joint_state = JointState() + joint_state.header.stamp = self.get_clock().now().to_msg() + + joint_state.name = self.robot.get_joints().tolist() + zero_config = self.robot.get_position_for_joint_states(self.robot.get_zero_configuration()) + joint_state.position = [joint["q"] for joint in zero_config] + + self.publisher_joint_states.publish(joint_state) + + + def update_checked_frames(self): + """ + Function to update the external forces based on the checked frames in the menu. + """ + # create the array with only the checked frames (with external force applied) + self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) + + if len(self.checked_frames) != 0: + # create the array with the masses of the checked frames + self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) + else: + # if there are no checked frames, set the masses to None + self.masses = None + + + def joint_states_callback(self, msg): + """ + Callback function for the joint states topic. + """ + if self.robot is not None: + # if you are not using the calculated configuration from workspace, you can use the joint states to compute the torques because you don't have already the computed torques + if self.selected_configuration is None: + self.positions = list(msg.position) + self.name_positions = list(msg.name) + v = msg.velocity if msg.velocity else self.robot.get_zero_velocity() + + # set the positions based on the joint states + q = self.robot.set_position(self.positions, self.name_positions) + a = self.robot.get_zero_acceleration() + + # if there are no checked frames, set the external force to None + if len(self.checked_frames) != 0: + # create the external force with the masses and the checked frames + self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q) + else: + self.external_force = None + + # compute the inverse dynamics + tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force) + + # check the effort limits + status_efforts = self.robot.check_effort_limits(tau) + + # print the torques + self.robot.print_torques(tau) + + # get the positions of the joints where the torques are applied based on + joints_position, offset_z = self.robot.get_joints_placements(q) + + # Publish the torques in RViz + self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position) + + else: + # if you are using the calculated configuration from workspace, you can use the valid configurations to visualize the torques labels + + # get the positions of the joints where the torques are applied based on + joints_position, offset_z = self.robot.get_joints_placements(self.valid_configurations[self.selected_configuration]["config"]) + # get the torques status (if the torques are within the limits) + status_efforts = self.robot.check_effort_limits(self.valid_configurations[self.selected_configuration]["tau"]) + + self.publish_label_torques(self.valid_configurations[self.selected_configuration]["tau"], status_torques=status_efforts ,joints_position=joints_position) + + + + def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): + """ + Publish the torque with labels on the robot in RViz. + + Args: + torque (np.ndarray): The torque to be published + status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise + joints_position (np.ndarray): The positions of the joints where the torques are applied + """ + marker_array = MarkerArray() + for i, (t, joint) in enumerate(zip(torque, joints_position)): + # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints) + if "gripper" not in joint['name']: + marker = Marker() + marker.header.frame_id = "base_link" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "torque_visualization" + marker.id = i + marker.type = Marker.TEXT_VIEW_FACING + # Set the text based on the joint type + if joint['type'] == 'JointModelPZ': + marker.text = f"{joint['name']}: {t:.2f} N" + else: + marker.text = f"{joint['name']}: {t:.2f} Nm" + + marker.action = Marker.ADD + marker.pose.position.x = joint['x'] + marker.pose.position.y = joint['y'] + marker.pose.position.z = joint['z'] + marker.pose.orientation.w = 1.0 + marker.scale.x = 0.03 + marker.scale.y = 0.03 + marker.scale.z = 0.03 + marker.color.a = 1.0 # Alpha + if status_torques[i]: + marker.color.r = 0.0 # Red + marker.color.g = 1.0 # Green + marker.color.b = 0.0 # Blue + else: + marker.color.r = 1.0 # Red + marker.color.g = 0.0 # Green + marker.color.b = 0.0 # Blue + marker_array.markers.append(marker) + + self.publisher_rviz_torque.publish(marker_array) + + + def publish_workspace_area(self, valid_configs: np.ndarray ): + """ + Publish the workspace area in RViz using points and labels for the end points. + + Args: + valid_configs (np.ndarray): Current valid configurations in the workspace of the robot. + """ + # Create a MarkerArray to visualize the number of configuration of a specific point in the workspace + marker_point_names = MarkerArray() + + # Create a Marker for the workspace area using points + marker_points = MarkerArray() + + # calculate the maximum torques for each joint in the current valid configurations for each arm only if the user selected the max current torque visualization + if self.menu.get_torque_color() == TorqueVisualizationType.MAX_CURRENT_TORQUE: + max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(valid_configs) + + cont = 0 + # Iterate through the valid configurations and create markers + for i, valid_config in enumerate(valid_configs): + + # create the label for the end point (end effector position) of the valid configuration + marker_point_name = Marker() + marker_point_name.header.frame_id = "base_link" + marker_point_name.header.stamp = self.get_clock().now().to_msg() + + marker_point_name.ns = f"workspace_area_{valid_config['arm']}_arm" + marker_point_name.id = i + 1 + marker_point_name.type = Marker.TEXT_VIEW_FACING + marker_point_name.text = f"Config {i}" + marker_point_name.action = Marker.ADD + marker_point_name.pose.position.x = valid_config["end_effector_pos"][0] + marker_point_name.pose.position.y = valid_config["end_effector_pos"][1] + marker_point_name.pose.position.z = valid_config["end_effector_pos"][2] + 0.05 # Offset to avoid overlap with the sphere + marker_point_name.pose.orientation.w = 1.0 + marker_point_name.scale.x = 0.02 + marker_point_name.scale.y = 0.02 + marker_point_name.scale.z = 0.02 + marker_point_name.color.a = 1.0 # Alpha + marker_point_name.color.r = 1.0 # Red + marker_point_name.color.g = 1.0 # Green + marker_point_name.color.b = 1.0 # Blue + + # get the joint positions for the valid configuration + joint_positions, offset_z = self.robot.get_joints_placements(valid_config["config"]) + + # get the normalized torques for the valid configuration with current target limits for color visualization + if self.menu.get_torque_color() == TorqueVisualizationType.TORQUE_LIMITS: + norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"]) + else: + # if the user selected the max torque, use the maximum torques for the joint + if valid_config["arm"] == "left": + norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_left) + else: + norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_right) + + # insert points related to the end effector position in the workspace area and with color based on the normalized torques for each joint + for joint_pos,tau in zip(joint_positions,norm_joints_torques): + # print only the points for the corrisponding arm, in this way we can visualize the workspace area for each arm separately and avoid confusion + if valid_config["arm"] in joint_pos["name"] : + point = Marker() + point.header.frame_id = "base_link" + point.header.stamp = self.get_clock().now().to_msg() + point.ns = joint_pos["name"] + point.id = cont + point.type = Marker.SPHERE + point.action = Marker.ADD + point.scale.x = 0.03 # Size of the sphere + point.scale.y = 0.03 + point.scale.z = 0.03 + + point.pose.position.x = valid_config["end_effector_pos"][0] + point.pose.position.y = valid_config["end_effector_pos"][1] + point.pose.position.z = valid_config["end_effector_pos"][2] - offset_z + point.pose.orientation.w = 1.0 + point.color.a = 1.0 # Alpha + point.color.r = tau # Red + point.color.g = 1 - tau # Green + point.color.b = 0.0 # Blue + + cont += 1 + + # Add the point to the points array + marker_points.markers.append(point) + + # Add the marker point name to the marker point names array + marker_point_names.markers.append(marker_point_name) + + + # get the unified torque for the valid configurations + unified_configurations_torque = self.robot.get_unified_configurations_torque(valid_configs) + + # insert points related to the end effector position in the workspace area and with color based on the normalized torque for each configuration + # this is used to visualize the workspace area with the unified torques for each configuration + for i, unified_config in enumerate(unified_configurations_torque): + marker_point = Marker() + marker_point.header.frame_id = "base_link" + marker_point.header.stamp = self.get_clock().now().to_msg() + marker_point.ns = f"unified_torque_workspace_{unified_config['arm']}_arm" + marker_point.id = i + marker_point.type = Marker.SPHERE + marker_point.action = Marker.ADD + marker_point.scale.x = 0.03 # Size of the sphere + marker_point.scale.y = 0.03 + marker_point.scale.z = 0.03 + marker_point.pose.position.x = unified_config["end_effector_pos"][0] + marker_point.pose.position.y = unified_config["end_effector_pos"][1] + marker_point.pose.position.z = unified_config["end_effector_pos"][2] - offset_z # Offset to avoid overlap with the sphere + marker_point.pose.orientation.w = 1.0 + marker_point.color.a = 1.0 # Alpha + + # Color based on the normalized torques + marker_point.color.r = unified_config["norm_tau"] + marker_point.color.g = 1 - unified_config["norm_tau"] + marker_point.color.b = 0.0 # Blue + + # Add the marker point to the points array + marker_points.markers.append(marker_point) + + + # Publish the marker points and names + self.publisher_workspace_area.publish(marker_points) + self.publisher_workspace_area_names.publish(marker_point_names) + + + def clear_workspace_area_markers(self): + """ + Clear the workspace area markers in RViz. + This will publish an empty MarkerArray to remove the previous markers. + """ + # create an empty MarkerArray to clear the markers with a DELETEALL action + marker_array_msg = MarkerArray() + marker = Marker() + marker.id = 0 + marker.action = Marker.DELETEALL + marker_array_msg.markers.append(marker) + + # Publish the empty MarkerArray to clear the markers + self.publisher_workspace_area.publish(marker_array_msg) + self.publisher_workspace_area_names.publish(marker_array_msg) + + + + +def main(args=None): + rclpy.init(args=args) + node = RobotDescriptionSubscriber() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml new file mode 100644 index 0000000..f39adf4 --- /dev/null +++ b/dynamic_payload_analysis_ros/package.xml @@ -0,0 +1,18 @@ + + + + dynamic_payload_analysis_ros + 0.0.0 + This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling. + morolinux + Apache License 2.0 + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros b/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros new file mode 100644 index 0000000..e69de29 diff --git a/dynamic_payload_analysis_ros/setup.cfg b/dynamic_payload_analysis_ros/setup.cfg new file mode 100644 index 0000000..5a64333 --- /dev/null +++ b/dynamic_payload_analysis_ros/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/dynamic_payload_analysis_ros +[install] +install_scripts=$base/lib/dynamic_payload_analysis_ros diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py new file mode 100644 index 0000000..901e1da --- /dev/null +++ b/dynamic_payload_analysis_ros/setup.py @@ -0,0 +1,27 @@ +from setuptools import find_packages, setup + +package_name = 'dynamic_payload_analysis_ros' + +setup( + name=package_name, + version='0.0.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='Enrico Moro', + maintainer_email='enrimoro003@gmail.com', + description='This package provides graphics tools in Rviz for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.', + license='Apache License 2.0', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main', + 'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main', + ], + }, +) diff --git a/dynamic_payload_analysis_ros/test/test_copyright.py b/dynamic_payload_analysis_ros/test/test_copyright.py new file mode 100644 index 0000000..97a3919 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_copyright.py @@ -0,0 +1,25 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_copyright.main import main +import pytest + + +# Remove the `skip` decorator once the source file(s) have a copyright header +@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') +@pytest.mark.copyright +@pytest.mark.linter +def test_copyright(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found errors' diff --git a/dynamic_payload_analysis_ros/test/test_flake8.py b/dynamic_payload_analysis_ros/test/test_flake8.py new file mode 100644 index 0000000..27ee107 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_flake8.py @@ -0,0 +1,25 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_flake8.main import main_with_errors +import pytest + + +@pytest.mark.flake8 +@pytest.mark.linter +def test_flake8(): + rc, errors = main_with_errors(argv=[]) + assert rc == 0, \ + 'Found %d code style errors / warnings:\n' % len(errors) + \ + '\n'.join(errors) diff --git a/dynamic_payload_analysis_ros/test/test_pep257.py b/dynamic_payload_analysis_ros/test/test_pep257.py new file mode 100644 index 0000000..b234a38 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_pep257.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_pep257.main import main +import pytest + + +@pytest.mark.linter +@pytest.mark.pep257 +def test_pep257(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found code style errors / warnings'