diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
old mode 100644
new mode 100755
index cdfa3dd..8498c26
--- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
+++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
@@ -20,14 +20,9 @@
from typing import Union
from pathlib import Path
from numpy.linalg import norm, solve
-from optik import Robot, SolverConfig
import tempfile
-from ikpy import chain
import os
-
-# TODO : If a workspace calculation is already done, store the results in a file to avoid recalculating it and make only the
-# verification of the payload handling in the workspace.
-
+from urdf_parser_py.urdf import URDF
class TorqueCalculator:
@@ -43,8 +38,12 @@ def __init__(self, robot_description : Union[str, Path]):
if isinstance(robot_description, str):
self.model = pin.buildModelFromXML(robot_description)
- # TODO change parser in general for more unique solution
-
+ # compute mimic joints
+ self.compute_mimic_joints(robot_description)
+
+ # get the root joint name
+ self.root_name = self.get_root_joint_name(robot_description)
+
# create temporary URDF file from the robot description string
with tempfile.NamedTemporaryFile(mode='w', suffix='.urdf', delete=False) as temp_file:
temp_file.write(robot_description)
@@ -67,9 +66,61 @@ def __init__(self, robot_description : Union[str, Path]):
# get the default collisions in the robot model to avoid take them into account in the computations
self.default_collisions = self.compute_static_collisions()
- # create array to store all possible configurations for left and right arms
- self.configurations_l = None
- self.configurations_r = None
+ # compute main trees of the robot model
+ self.compute_subtrees()
+
+
+
+ # array to store all configurations for the robot model
+ self.configurations = np.array([], dtype=object)
+
+ # create items for each tree in the robot model
+ for tree in self.subtrees:
+ self.configurations = np.append(self.configurations, {"tree_id": tree["tree_id"], "configurations": None, "selected_joint_id" : None})
+
+
+ def get_root_joint_name(self, robot_description: str) -> str:
+ """
+ Get the root joint name from the robot description XML string.
+
+ :param robot_description: Robot description in XML format provided by /robot_description topic.
+ """
+ try:
+ robot = URDF.from_xml_string(robot_description)
+ root_name = robot.get_root()
+ except Exception as e:
+ print(f"Error parsing URDF xml: {e}")
+ root_name = None
+
+ return root_name
+
+
+ def compute_mimic_joints(self, urdf_xml):
+ """
+ Function to find all mimic joints with mimicked joints and ids.
+
+ Args:
+ urdf_xml (str): The string from robot_description topic.
+ """
+ try:
+ robot = URDF.from_xml_string(urdf_xml)
+ except:
+ print(f"Error parsing URDF xml")
+
+ self.mimic_joint_names = []
+ self.mimicked_joint_names = []
+
+ # Iterate through all joints in the robot model to find mimic joints
+ for joint in robot.joints:
+ if joint.mimic:
+ # Add the mimic joint name to the list
+ self.mimic_joint_names.append(joint.name)
+ # Add the mimicked joint name to the list
+ self.mimicked_joint_names.append(joint.mimic.joint)
+
+ # create lists of joint ids for mimic and mimicked joints
+ self.mimic_joint_ids = [self.model.getJointId(name) for name in self.mimic_joint_names]
+ self.mimicked_joint_ids = [self.model.getJointId(name) for name in self.mimicked_joint_names]
def compute_static_collisions(self):
@@ -95,9 +146,67 @@ def compute_static_collisions(self):
return collision_pairs
+ def compute_subtrees(self):
+ """
+ Compute the sub-trees of the robot model.
+ This method is used to compute the sub-trees of the robot model
+ """
+
+ tip_joints = []
+ for id in range(0, self.model.njoints):
+ if len(self.model.subtrees[id]) == 1:
+ tip_joints += [id]
+
+ self.subtrees = np.array([], dtype=object)
+ cont = 0
+ for i, jointID in enumerate(tip_joints):
+ joint_tree_ids = self.get_filtered_subtree(jointID)
+
+ # insert the sub-tree only if the tip joint is not already in the sub-trees
+ tip_joint_already_exists = False
+ for existing_tree in self.subtrees:
+ if existing_tree["tip_joint_id"] == joint_tree_ids[-1]:
+ tip_joint_already_exists = True
+ break
+
+ if not tip_joint_already_exists:
+ # get the link names in the sub-tree
+ link_names = self.get_links_from_tree(joint_tree_ids)
+ # get the joint names in the sub-tree
+ joint_names = [self.model.names[joint_id] for joint_id in joint_tree_ids]
+
+ self.subtrees = np.append(self.subtrees, {"tree_id": cont, "link_names": link_names ,"joint_names": joint_names, "joint_ids": joint_tree_ids,"tip_link_name": link_names[-1], "tip_joint_id": joint_tree_ids[-1], "selected_joint_id": None})
+ cont += 1
+
+
+ def get_filtered_subtree(self, current_tip_id : int) -> np.ndarray:
+ """
+ Filter the sub-trees of the robot based on the mimic joints and mimicked joints.
+ If the current tip joint is not a mimic joint, the subtree is returned as is.
+ :param current_tip_id: Id of the current tip joint to filter the subtree.
+ :return: Filtered tree with tip joint based on the mimicked joint of the mimic joint.
+ """
+ # find mimicked joint of the current tip joint
+ if current_tip_id in self.mimic_joint_ids:
+ # get the index of the mimic joint in the mimic_joint_ids
+ mimic_joint_index = self.mimic_joint_ids.index(current_tip_id)
+ # get the mimicked joint id
+ mimicked_joint_id = self.mimicked_joint_ids[mimic_joint_index]
+
+ # filter the subtree to include only the mimicked joint and its children
+ filtered_subtree = self.model.supports[mimicked_joint_id].tolist()
+ else:
+ # if the current tip joint is not a mimic joint, return the subtree as is
+ filtered_subtree = self.model.supports[current_tip_id].tolist()
+
+ # remove universe joint
+ filtered_subtree = filtered_subtree[1:]
+
+ return filtered_subtree
+
- def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray:
+ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : np.ndarray, extForce : np.ndarray = None) -> np.ndarray:
"""
Compute the inverse dynamics torque vector.
@@ -121,7 +230,7 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n
return tau
- def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]:
+ def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray:
"""
Create external forces vector based on the masses and frame ID.
The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint.
@@ -189,45 +298,6 @@ def update_configuration(self, q : np.ndarray):
"""
pin.forwardKinematics(self.model, self.data, q)
pin.updateFramePlacements(self.model, self.data)
-
-
- def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, frame_name : str) -> float:
- """
- Compute the forward dynamics acceleration vector.
-
- :param q: Joint configuration vector.
- :param qdot: Joint velocity vector.
- :param tau: Joint torque vector.
- :param frame_name: Name of the frame where the force is applied.
- :return: Acceleration vector.
- """
- # TODO : refactor this method
-
- # Update the configuration of the robot model
- self.update_configuration(q)
-
- # basic idea for forward dynamics: M(q) * a_q + b = tau(q) --> a_q = (tau(q) - b)/ M(q)
- # with external force, the equation becomes: M(q) * a_q + b = tau(q) + J_traspose(q) * f_ext -- > f_ext = (M(q) * a_q + b - tau(q))/ J_traspose(q)
-
- qddot0 = np.zeros(self.model.nv) # Initialize acceleration vector
-
- # calculate dynamics drift
- b = pin.rnea(self.model, self.data, q, qdot, qddot0)
-
- #get jacobian of the frame where the payload is applied
- J = self.compute_jacobian(q, frame_name)
-
- tau_r = b - tau
-
- try:
- #a = np.linalg.solve(M, tau - b)
- # get F = (J_transpose(q))^-1 X ( tau - b ) with b = M(q)*a_q + b || pinv = pseudo-inverse to prevent singularities
- F_max = np.linalg.pinv(J.T) @ tau_r
-
- except np.linalg.LinAlgError as e:
- raise ValueError(f"Failed to solve for acceleration: {e}")
-
- return F_max[2] # get the force in z axis of the world frame, which is the maximum force payload
@@ -265,7 +335,7 @@ def compute_inverse_kinematics_ikpy(self, q : np.ndarray, end_effector_position:
- def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.ndarray, end_effector_name : str) -> np.ndarray:
+ def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.ndarray, joint_id : str) -> np.ndarray:
"""
Compute the inverse kinematics for the robot model with joint limits consideration.
:param q: current joint configuration vector.
@@ -274,10 +344,8 @@ def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.n
:return: Joint configuration vector that achieves the desired end effector position.
"""
- joint_id = self.model.getJointId(end_effector_name) # Get the joint ID of the end effector
-
# Set parameters for the inverse kinematics solver
- eps = 1e-3 # reduce for more precision
+ eps = 1e-2 # reduce for more precision
IT_MAX = 500 # Maximum number of iterations
DT = 1e-1
damp = 1e-12
@@ -332,13 +400,13 @@ def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.n
- def compute_all_configurations(self, range : int, resolution : int, end_effector_name : str) -> np.ndarray:
+ def compute_all_configurations(self, range : int, resolution : int, end_joint_id) -> np.ndarray:
"""
Compute all configurations for the robot model within a specified range.
:param range (int): Range as side of a square where in the center there is the actual position of end effector.
:param resolution (int): Resolution of the grid to compute configurations.
- :param end_effector_name (str): Name of the end effector joint.
+ :param end_joint_id (str): Id of the end effector joint selected in the tree.
:return : Array of joint configurations that achieve the desired end effector position.
"""
@@ -356,11 +424,11 @@ def compute_all_configurations(self, range : int, resolution : int, end_effector
configurations = []
# Iterate over the range to compute all configurations
- for x in np.arange(-range/2, range/2 , resolution):
- for y in np.arange(-range/2, range/2 , resolution):
- for z in np.arange(0, range , resolution):
+ for x in np.arange(-range, range , resolution):
+ for y in np.arange(-range, range , resolution):
+ for z in np.arange(-range/2, range , resolution):
target_position = pin.SE3(np.eye(3), np.array([x, y, z]))
- new_q = self.compute_inverse_kinematics(q, target_position, end_effector_name)
+ new_q = self.compute_inverse_kinematics(q, target_position, end_joint_id)
if new_q is not None:
q = new_q
@@ -371,20 +439,22 @@ def compute_all_configurations(self, range : int, resolution : int, end_effector
- def verify_configurations(self, configurations_left : np.ndarray, configurations_right : np.ndarray, masses : np.ndarray, checked_frames : np.ndarray) -> np.ndarray:
+ def verify_configurations(self, configurations: np.ndarray, masses : np.ndarray, checked_frames : np.ndarray, tree_id: int, selected_joint_id: int) -> np.ndarray:
"""
Verify the configurations to check if they are valid.
- :param configurations_left: Array of joint configurations to verify for the left arm.
- :param configurations_right: Array of joint configurations to verify for the right arm.
- :param ext_forces: Array of external forces to apply to the robot model.
- :return: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}].
+ :param configurations: Array of joint configurations to verify for the left arm.
+ :param masses (np.ndarray): Array of masses to apply to the robot model.
+ :param checked_frames (np.ndarray): Array of frame names where the external forces are applied.
+ :param tree_id (int): Identifier of the tree to verify the configurations for.
+ :param selected_joint_id (int): Identifier of the selected joint in the tree to verify the configurations for.
+ :return: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau", "tree_id","selected_joint_id" }].
"""
valid_configurations = []
# check valid configurations for left arm
- for q in configurations_left:
+ for q in configurations:
# Update the configuration of the robot model
self.update_configuration(q["config"])
@@ -398,7 +468,7 @@ def verify_configurations(self, configurations_left : np.ndarray, configurations
tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration())
# Check if the torques are within the effort limits
- if self.check_effort_limits(tau,"left").all():
+ if self.check_effort_limits(tau= tau, tree_id= tree_id).all():
valid = True
# Compute all the collisions
pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False)
@@ -414,72 +484,90 @@ def verify_configurations(self, configurations_left : np.ndarray, configurations
break
if valid:
- valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "left" })
+ valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "tree_id" : tree_id,"selected_joint_id": selected_joint_id})
- # check valid configurations for right arm
- for q in configurations_right:
- # Update the configuration of the robot model
- self.update_configuration(q["config"])
-
- if masses is not None and checked_frames is not None:
- # Create external forces based on the masses and checked frames
- ext_forces = self.create_ext_force(masses, checked_frames, q["config"])
- # Compute the inverse dynamics for the current configuration
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces)
- else:
- # Compute the inverse dynamics for the current configuration without external forces
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration())
-
- # Check if the torques are within the effort limits
- if self.check_effort_limits(tau,"right").all():
- valid = True
- # Compute all the collisions
- pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False)
-
- # Print the status of collision for all collision pairs
- for k in range(len(self.geom_model.collisionPairs)):
- cr = self.geom_data.collisionResults[k]
- cp = self.geom_model.collisionPairs[k]
-
- if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions:
- print(f"Collision detected between {cp.first} and {cp.second} in the right arm configuration.")
- valid = False
- break
-
- if valid:
- valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "right" })
-
return np.array(valid_configurations, dtype=object)
- def get_valid_workspace(self, range : int, resolution : int, end_effector_name_left : str, end_effector_name_right, masses : np.ndarray, checked_frames: np.ndarray) -> np.ndarray:
+ def get_valid_workspace(self, range : int, resolution : float, masses : np.ndarray, checked_frames: np.ndarray) -> np.ndarray:
"""
Get the valid workspace of the robot model by computing all configurations within a specified range.
:param range (int): Range as side of a square where in the center there is the actual position of end effector.
:param resolution (int): Resolution of the grid to compute configurations.
- :param end_effector_name_left (str): Name of the end effector joint to test for the left arm.
- :param end_effector_name_right (str): Name of the end effector joint to test for the right arm.
:param masses (np.ndarray): Array of masses to apply to the robot model.
:param checked_frames (np.ndarray): Array of frame names where the external forces are applied.
:return: Array of valid configurations that achieve the desired end effector position in format: [{"config", "end_effector_pos, "tau", "arm"}].
"""
- # compute all configurations for the left and right arms if they are not already computed
- if self.configurations_l is None or self.configurations_r is None:
- # Compute all configurations within the specified range for the left arm
- self.configurations_l = self.compute_all_configurations(range,resolution, end_effector_name_left)
- # Compute all configurations within the specified range for the right arm
- self.configurations_r = self.compute_all_configurations(range,resolution, end_effector_name_right)
+ # create the array to store all current valid configurations
+ valid_current_configurations = np.array([], dtype=object)
+
+ # compute all configurations for the selected joints of the trees
+ for tree,configuration in zip(self.subtrees,self.configurations):
+ # if the configurations are not computed or the selected joint ID is not the same as in the tree, compute the configurations
+ if configuration["configurations"] is None or configuration["selected_joint_id"] != tree["selected_joint_id"]:
+ if tree["selected_joint_id"] is not None:
+ # Compute all configurations for the current tree
+ configuration["configurations"] = self.compute_all_configurations(range, resolution,tree["selected_joint_id"])
+ # Set the selected joint ID to the current tree's selected joint ID
+ configuration["selected_joint_id"] = tree["selected_joint_id"]
+ else:
+ pass
+ # if the selected joint ID is None in the tree, I could remove the computed configurations,even though it is not necessary and it could be useful if the
+ # user selects the joint later
+
+
+ if configuration["configurations"] is not None and tree["selected_joint_id"] is not None:
+ # Verify the configurations to check if they are valid
+ valid_configurations = self.verify_configurations(configuration["configurations"], masses, checked_frames, tree["tree_id"], tree["selected_joint_id"])
+
+ # Append the valid configurations to the current valid configurations array
+ valid_current_configurations = np.append(valid_current_configurations, valid_configurations)
- # Verify the configurations to check if they are valid for both arms
- valid_configurations = self.verify_configurations(self.configurations_l,self.configurations_r, masses, checked_frames)
-
- return valid_configurations
+
+
+
+ return valid_current_configurations
+ def compute_maximum_payloads(self, configs : np.ndarray):
+ """
+ Compute the maximum payload for each provided configuration and return the results with the configs updated with the maximum payload as a new value.
+ :param configs: Array of configurations , format {"config", "end_effector_pos", "tau", "arm", "max_payload" }
+ """
+ for config in configs:
+ config["max_payload"] = self.find_max_payload_binary_search(config, payload_min=0.0, payload_max=15, resolution=0.01)
+
+ return configs
- def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray:
+
+ def find_max_payload_binary_search(self, config : np.ndarray, payload_min : float = 0.0, payload_max : float = 10.0, resolution : float = 0.01):
+ """
+ Find the maximum payload for a given configuration using binary search.
+ :param config: Configuration dictionary (must contain 'config' key).
+ :param payload_min: Minimum payload to test.
+ :param payload_max: Maximum payload to test.
+ :param resolution: Desired precision.
+ :return: Maximum allowable payload.
+ """
+ low = payload_min
+ high = payload_max
+ max_valid = payload_min
+
+ while high - low > resolution:
+ mid_payload = (low + high) / 2
+ ext_forces = self.create_ext_force(mid_payload, self.get_joint_name(config["selected_joint_id"]), config["config"])
+ tau = self.compute_inverse_dynamics(config["config"], self.get_zero_velocity(), self.get_zero_acceleration(), extForce=ext_forces)
+ if self.check_effort_limits(tau, config['tree_id']).all():
+ max_valid = mid_payload
+ low = mid_payload
+ else:
+ high = mid_payload
+
+ return max_valid
+
+ def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray = None) -> np.ndarray:
"""
Compute the forward dynamics acceleration vector with Articulated-Body algorithm(ABA).
@@ -523,33 +611,116 @@ def compute_jacobian(self, q : np.ndarray, frame_name : str) -> np.ndarray:
return J_frame
+ def verify_member_tree(self, tree_id: int, joint_id: int) -> bool:
+ """
+ Verify if the given joint ID is a member of the specified tree.
+
+ :param tree_id: Identifier of the tree to verify.
+ :param joint_id: Identifier of the joint to verify.
+ :return: True if the joint is a member of the tree, False otherwise.
+ """
+
+ # Check if the joint ID is in the list of joint IDs for the specified tree
+ return joint_id in self.subtrees[tree_id]["joint_ids"]
+
+
+ def get_subtrees(self) -> np.ndarray:
+ """
+ Get the sub-trees of the robot model.
+
+ :return: Array of sub-trees of the robot model.
+ """
+ return self.subtrees
+
+
+ def get_links_from_tree(self, joint_ids: np.ndarray | int) -> np.ndarray:
+ """
+ Get the links from the robot model based on the joint IDs.
+
+ :param joint_ids: Array of joint IDs to get the frames for.
+ :return: Array of frames corresponding to the joint IDs.
+ """
+ frames = []
+ # If joint_ids is a single integer, convert it to a list
+ if isinstance(joint_ids, int):
+ joint_ids = [joint_ids]
+
+ for joint_id in joint_ids:
+ for link in self.model.frames:
+ if link.parentJoint == joint_id and link.type == pin.FrameType.BODY:
+ frames.append(link.name)
+ break
+
+ return np.array(frames, dtype=object)
+
+
def get_maximum_torques(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray:
"""
Get the maximum torques for each joint in all valid configurations.
:param valid_configs: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}].
- :return: Arrays of maximum torques for each joint in the current valid configurations for left and right arm.
+ :return: Arrays of maximum torques for each joint in the current valid configurations for selected trees.
"""
# Get the number of joints
num_joints = len(valid_configs[0]["tau"])
- max_torques_left = np.array([], dtype=float)
- max_torques_right = np.array([], dtype=float)
+
+ # array to store the absolute torques for each joint in the current valid configurations for each selected tree
+ abs_joint_torques = np.array([], dtype=object)
+
+ # get the selected trees from the sub_trees
+ selected_trees = [tree for tree in self.subtrees if tree["selected_joint_id"] is not None]
+
+ # create an array to store the absolute torques for each joint in the current valid configurations for each selected tree
+ for tree in selected_trees:
+ # array to store the absolute torques for each joint in the current tree
+ abs_torques = np.array([], dtype=object)
+
+ for i in range(num_joints):
+ # Get the joint torques for the current tree
+ abs_torques = np.append(abs_torques ,{"joint" : i ,"abs": [abs(config["tau"][i]) for config in valid_configs if config["tree_id"] == tree["tree_id"]]})
- # Find maximum absolute torque for each joint
- for i in range(num_joints):
- joint_torques_left = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "left"]
- joint_torques_right = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "right"]
+ abs_joint_torques = np.append(abs_joint_torques, {"tree_id": tree["tree_id"], "abs_torques": abs_torques})
- max_torques_left = np.append( max_torques_left, max(joint_torques_left))
- max_torques_right = np.append( max_torques_right, max(joint_torques_right))
+ # array to store the maximum absolute torques for each joint in the current valid configurations
+ max_torques = np.array([], dtype=float)
+
+ # get the maximum absolute torques for each joint in the current valid configurations for each selected tree
+ for torques in abs_joint_torques:
+ max_tau = np.array([], dtype=object)
+ # Get the maximum absolute torque for the current joint
+ for tau in torques["abs_torques"]:
+ if len(tau["abs"]) > 0:
+ max_tau = np.append(max_tau, max(tau["abs"]))
+ else:
+ max_tau = np.append(max_tau, 0.0)
+
+ max_torques = np.append(max_torques, {"tree_id": torques["tree_id"], "max_values": max_tau})
+
- return max_torques_left, max_torques_right
+ return max_torques
-
+
+ def get_maximum_payloads(self, valid_configs : np.ndarray) -> np.ndarray:
+ """
+ Get the maximum payloads for all configuration in the left and right arm.
+ :param valid_configs: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau", "arm", "max_payload"}].
+ :return: Tuple of arrays of maximum payloads for left and right arms.
+ """
+ max_payloads = np.array([], dtype=float)
+ for tree in self.subtrees:
+ payloads = [config["max_payload"] for config in valid_configs if config["tree_id"] == tree["tree_id"]]
+ if payloads:
+ max_payload = max(payloads)
+ max_payloads = np.append(max_payloads, {"tree_id": tree["tree_id"], "max_payload": max_payload})
+
+ return max_payloads
+
+
+
- def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray = None) -> np.ndarray:
+ def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray = None, tree_id: int = None) -> np.ndarray:
"""
Normalize the torques vector to a unified scale.
@@ -567,16 +738,30 @@ def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray =
for i, torque in enumerate(tau):
norm_tau.append(abs(torque) / self.model.effortLimit[i])
else:
+ # get the maximum values for the current tree
+ max_torque = next(item for item in target_torque if item["tree_id"] == tree_id)
# Normalize the torques vector to the target torque
for i, torque in enumerate(tau):
- if target_torque[i] != 0:
- norm_tau.append(abs(torque) / target_torque[i])
+ if max_torque["max_values"][i] != 0:
+ norm_tau.append(abs(torque) / max_torque["max_values"][i])
else:
norm_tau.append(0.0)
-
return norm_tau
-
+
+
+ def get_normalized_payload(self, payload : np.ndarray, target_payload : float) -> np.ndarray:
+ """
+ Normalize the torques vector to a unified scale.
+
+ :param payload: Maximum payload for a configuration.
+ :param target_payload: Target payload to normalize the payload to.
+ :return: Normalized payload.
+ """
+ norm_payload = abs(payload) / target_payload
+
+ return norm_payload
+
def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray:
"""
@@ -585,8 +770,12 @@ def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.nd
:param q: Joint configuration vector.
:param valid_configs: Array of
"""
+
torques_sum = np.array([], dtype=float)
norm_torques = np.array([], dtype=float)
+
+ # array to store max and min sum of torques for each tree
+ max_min_value_torques = np.array([], dtype=float)
sum = 0.0
for valid_config in valid_configs:
@@ -595,26 +784,41 @@ def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.nd
tau = valid_config["tau"]
# calculate the sum of torques for each joint configuration
- for joint, torque in zip(q, tau):
+ for torque in tau:
#if abs(torque) < 50:
sum += abs(torque)
- torques_sum = np.append(torques_sum, {"sum" : sum, "end_effector_pos" : valid_config["end_effector_pos"], "arm" : valid_config["arm"]})
+ torques_sum = np.append(torques_sum, {"sum" : sum, "end_effector_pos" : valid_config["end_effector_pos"], "tree_id" : valid_config["tree_id"]})
sum = 0.0 # reset the sum for the next configuration
- # get the maximum torque from the sum of torques for both arms
- max_torque_l = max([item["sum"] for item in torques_sum if item["arm"] == "left"])
- max_torque_r = max([item["sum"] for item in torques_sum if item["arm"] == "right"])
+
+ # get the maximum torque from the sum of torques for all selected trees
+ for tree in self.subtrees:
+ # Get all sum values for the current tree
+ tree_sums = [item["sum"] for item in torques_sum if item["tree_id"] == tree["tree_id"]]
+
+ if tree_sums: # Check if there are any sums for this tree
+ max_value = max(tree_sums)
+ min_value = min(tree_sums)
+ else:
+ max_value = 1.0 # Default value if no sums found
+ min_value = 0.0 # Default value if no sums found
+
+ max_min_value_torques = np.append(max_min_value_torques, {"tree_id": tree["tree_id"], "max_value": max_value, "min_value": min_value})
+
# Normalize the torques vector to a unified scale
for tau in torques_sum:
- if tau["arm"] == "left":
- norm_tau = tau["sum"] / max_torque_l
- else:
- norm_tau = tau["sum"] / max_torque_r
+
+ # Find the corresponding max_value and min_value for the current tree_id
+ max_value = next(item["max_value"] for item in max_min_value_torques if item["tree_id"] == tau["tree_id"])
+ min_value = next(item["min_value"] for item in max_min_value_torques if item["tree_id"] == tau["tree_id"])
+
+ norm_tau = (tau["sum"] - min_value ) / ( max_value - min_value)
+
# append the normalized torque to the array
- norm_torques = np.append(norm_torques, {"norm_tau" : norm_tau, "end_effector_pos" : tau["end_effector_pos"], "arm" : tau["arm"]})
+ norm_torques = np.append(norm_torques, {"norm_tau" : norm_tau, "end_effector_pos" : tau["end_effector_pos"], "tree_id" : tau["tree_id"]})
return norm_torques
@@ -714,12 +918,12 @@ def check_joint_limits(self, q : np.ndarray) -> np.ndarray:
return within_limits
- def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray:
+ def check_effort_limits(self, tau : np.ndarray, tree_id : int = None) -> np.ndarray:
"""
Check if the torques vector is within the effort limits of the robot model.
:param tau: Torques vector to check.
- :param arm: Arm name to filter the torques vector, right or left.
+ :param tree_id: Id of tree to filter control in the torques vector.
:return: Array of booleans indicating if each joint is within the effort limits.
"""
if tau is None:
@@ -729,7 +933,7 @@ def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray:
within_limits = np.array([], dtype=bool)
# if arm is not specified, check all joints
- if arm is None:
+ if tree_id is None:
# Check if the torques are within the limits
for i in range(self.model.njoints -1):
if abs(tau[i]) > self.model.effortLimit[i]:
@@ -742,15 +946,21 @@ def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray:
print("All joints are within effort limits. \n")
else:
- # Check if the torques are within the limits
- for i in range(self.model.njoints -1):
- # TODO arm is "left" or right" but the model has gripper with left and right in the same name
- if arm in self.model.names[i+1]:
- if abs(tau[i]) > self.model.effortLimit[i]:
- print(f"\033[91mJoint {i+1} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n")
- within_limits = np.append(within_limits, False)
- else:
- within_limits = np.append(within_limits, True)
+ # Check if the torque of joints inside the tree is within the limits
+ for id in self.subtrees[tree_id]["joint_ids"]:
+ if abs(tau[id-1]) > self.model.effortLimit[id-1]:
+ print(f"\033[91mJoint {id} exceeds effort limit: {tau[id-1]} > {self.model.effortLimit[id-1]} \033[0m\n")
+ within_limits = np.append(within_limits, False)
+ else:
+ within_limits = np.append(within_limits, True)
+
+ # for i in range(self.model.njoints -1):
+ # if arm in self.model.names[i+1]:
+ # if abs(tau[i]) > self.model.effortLimit[i]:
+ # print(f"\033[91mJoint {i+1} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n")
+ # within_limits = np.append(within_limits, False)
+ # else:
+ # within_limits = np.append(within_limits, True)
if np.all(within_limits):
print("All joints are within effort limits. \n")
@@ -759,6 +969,20 @@ def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray:
return within_limits
+ def set_joint_tree_selection(self, tree_id : int, joint_id: int):
+ """
+ Set the joint tree selection for the robot model.
+
+ :param tree_id: ID of the tree to select.
+ :param joint_id: ID of the joint to select.
+ """
+ for tree in self.subtrees:
+ if tree["tree_id"] == tree_id:
+ # Set the selected joint in the tree
+ tree["selected_joint_id"] = joint_id
+ return
+
+
def set_position(self, pos_joints : list[float], name_positions : list[str] ) -> np.ndarray:
"""
@@ -830,12 +1054,13 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray | float:
:param q: Joint configuration vector.
:return: Array of joint placements with names of joint, and z offset of base link.
"""
- base_link_id = self.model.getFrameId("base_link")
+ base_link_id = self.model.getFrameId(self.root_name)
offset_z = self.data.oMf[base_link_id].translation[2] # Get the z offset of the base link
self.update_configuration(q)
placements = np.array([({"name" : self.model.names[i],
- "type" : self.model.joints[i].shortname() ,
+ "type" : self.model.joints[i].shortname() ,
+ "id" : i,
"x": self.data.oMi[i].translation[0],
"y": self.data.oMi[i].translation[1],
"z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object)
@@ -843,14 +1068,27 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray | float:
return placements, offset_z
- def get_joint_placement(self, joint_id : int) -> dict:
+ def get_joint_name(self, id_joint: int) -> np.ndarray:
+ """
+ Get the name of the joint by its ID.
+
+ :param id_joint: ID of the joint to get the name for.
+ :return: Name of the joint.
+ """
+
+ return self.model.names[id_joint]
+
+ def get_joint_placement(self, joint_id : int, q : np.ndarray) -> dict:
"""
Get the placement of a specific joint in the robot model.
:param joint_id: ID of the joint to get the placement for.
+ :param q: Joint configuration vector.
:return: Dictionary with coordinates x , y , z of the joint.
"""
+ self.update_configuration(q)
+
if joint_id < 0 or joint_id >= self.model.njoints:
raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.")
@@ -874,7 +1112,6 @@ def get_mass_matrix(self, q : np.ndarray) -> np.ndarray:
return mass_matrix
-
def get_joints(self) -> np.ndarray:
"""
Get the array joint names of the robot model.
@@ -897,23 +1134,43 @@ def get_frames(self) -> np.ndarray:
- def get_active_frames(self) -> np.ndarray:
+ def get_links(self,all_frames : bool = False) -> np.ndarray:
"""
- Get the array of active joint names in the robot model.
-
- :return: array of active joint names.
+ Get the array of link names for payload menu.
+ :param all_frames: If True, return all frames, otherwise return only current tip frames.
+ :return: array of link names.
"""
# Get frames where joints are parents
frame_names = []
- for i in range(1, self.model.njoints):
- for frame in self.model.frames:
- if frame.parentJoint == i and frame.type == pin.FrameType.BODY:
- frame_names.append(frame.name)
- break
+ # If all_frames is True, get all link names from the subtrees
+ if all_frames:
+ for tree in self.subtrees:
+ if tree["selected_joint_id"] is not None:
+ # insert links in the array
+ for link in tree["link_names"]:
+ frame_names.append(link)
+ else:
+ # if all_frames is False, get only the links connected to the selected joint IDs from the subtrees
+ for tree in self.subtrees:
+ if tree["selected_joint_id"] is not None:
+ link_name = self.get_links_from_tree(tree["selected_joint_id"])
+ frame_names.append(link_name[0])
+
return np.array(frame_names, dtype=str)
+ def get_root_name(self) -> str:
+ """
+ Get the name of the root frame of the robot model.
+
+ :return: Name of the root frame.
+ """
+
+ if self.root_name is None:
+ raise ValueError("Root name is not set")
+
+ return self.root_name
def get_parent_joint_id(self, frame_name : str) -> int:
"""
diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml
index 3b5dfae..72f3082 100644
--- a/dynamic_payload_analysis_core/package.xml
+++ b/dynamic_payload_analysis_core/package.xml
@@ -4,7 +4,7 @@
dynamic_payload_analysis_core
0.0.0
Core package with calculation for torques calculator
- morolinux
+ Moro Enrico
Apache License 2.0
ament_copyright
@@ -12,6 +12,12 @@
ament_pep257
python3-pytest
+ python3-numpy
+ urdf_parser_py
+ pinocchio
+ python-pathlib
+ python-typing
+
ament_python
diff --git a/dynamic_payload_analysis_core/setup.py b/dynamic_payload_analysis_core/setup.py
index 70e2a59..86b17bb 100644
--- a/dynamic_payload_analysis_core/setup.py
+++ b/dynamic_payload_analysis_core/setup.py
@@ -11,13 +11,15 @@
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
- install_requires=['setuptools'],
zip_safe=True,
maintainer='Enrico Moro',
maintainer_email='enrimoro003@gmail.com',
description='This package implements core functionalities for dynamic payload analysis in robotics, focusing on torque calculations and external force handling.',
license='Apache License 2.0',
tests_require=['pytest'],
+ install_requires=[
+ 'setuptools',
+ ],
entry_points={
'console_scripts': [
],
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
old mode 100644
new mode 100755
index 571703b..b0b87dc
--- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
@@ -14,12 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import sys
import numpy as np
from interactive_markers import InteractiveMarkerServer
from interactive_markers import MenuHandler
-import rclpy
from visualization_msgs.msg import InteractiveMarker
from visualization_msgs.msg import InteractiveMarkerControl
from visualization_msgs.msg import Marker
@@ -31,27 +29,44 @@ class TorqueVisualizationType(Enum):
MAX_CURRENT_TORQUE = 2
class MenuPayload():
- def __init__(self, node):
+ def __init__(self, node, root_joint_name : str):
+ """
+ Initialize the menu for payload selection in Rviz.
+
+ param node: Node to create the interactive marker server.
+ param root_joint_name: Name of the root joint for the interactive marker server.
+ This is used to set the frame_id of the interactive marker.
+ """
+
# create server for interactive markers
self.server = InteractiveMarkerServer(node, 'menu_frames')
- # array to store the checked orunchecked frames and payload selection
+ # array to store the checked or unchecked frames for payload selection
self.frames_selection = np.array([],dtype=object)
+ # array to store the selected frame in the subtree menu
+ self.subtree_selection = np.array([], dtype=object)
+
# create handler for menu
self.menu_handler = MenuHandler()
#current managed frame
self.current_frame = None
+ # root joint name for the interactive marker server
+ self.root_joint_name = root_joint_name
+
# flag to compute workspace
self.compute_workspace = False
# variable to store the selected configuration to display in Rviz
self.selected_configuration = None
+ # variable used as identifier for the payload selection menu items
+ self.identifier = "id_payload"
+
# payload mass selection array
- self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float)
+ self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5], dtype=float)
# variable to store selection between torque limits or max torque in the current configuration for color visualization
self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # default is torque limits
@@ -62,12 +77,23 @@ def __init__(self, node):
# insert the reset payload button
self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset)
+ # insert label for the color menu selection
+ self.label_color_selection = self.menu_handler.insert('Select torque visualization color :',callback=self.callback_label)
+
# insert the checker for visualization color choice between torque limits or max torque in the current configuration
self.torque_limits_checker = self.menu_handler.insert('Torque limits',command=str(TorqueVisualizationType.TORQUE_LIMITS.value) , callback=self.callback_color_selection)
self.max_torque_checker = self.menu_handler.insert('Max torque', command=str(TorqueVisualizationType.MAX_CURRENT_TORQUE.value) , callback=self.callback_color_selection)
+ # set visible false for the torque limits and max torque checkboxes, they will be displayed only when configurations are inserted in the menu
self.menu_handler.setVisible(self.torque_limits_checker, False)
self.menu_handler.setVisible(self.max_torque_checker, False)
+ self.menu_handler.setVisible(self.label_color_selection, False)
+
+ # set the visibility of the workspace button to false, it will be displayed only when the user selects at least one tree
+ self.menu_handler.setVisible(self.workspace_button, False)
+
+ # label for tree selection
+ self.label_tree_selection = self.menu_handler.insert('Select end effector point for each tree :', callback=self.callback_tree_selection)
self.make_menu_marker('menu_frames')
@@ -78,22 +104,172 @@ def __init__(self, node):
def insert_frame(self, name):
"""
- Insert a new item(frame) in the checkbox menu of frames.
+ Insert a new item(frame) in the checkbox menu of frames for payload selection.
Args:
name (str) : name of the frame
"""
- last_item = self.menu_handler.insert(name, parent=self.root_frames, callback=self.callback_selection)
+ last_item = self.menu_handler.insert(name, parent=self.root_frames,command=self.identifier, callback=self.callback_selection)
self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED)
self.menu_handler.setVisible(last_item, True)
+ # flag to check if the item is already in the checked frames array
+ flag = False
+
# add the item to the checked frames array in order to keep track of the checked items
- self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} )
+ for i, item in enumerate(self.frames_selection):
+ if item['name'] == name:
+ # if the item is already in the array, we need to update it as default
+ self.frames_selection[i] = {"name": item['name'], "checked": False, "payload": 0.0}
+ flag = True
+ break
+
+ if flag is False:
+ # if the item is not in the array, we need to add it
+ self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} )
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def remove_frame(self, name):
+ """
+ Remove a item(frame) in the checkbox menu of frames for payload selection.
+
+ Args:
+ name (str) : name of the frame
+ """
+ # iterate through the menu entries to find the item with the specified name and identifier
+ # identifier is used to identify the item in the sub menu of payload selection
+ for i, item in self.menu_handler.entry_contexts_.items():
+ if item.title == name and item.command == self.identifier:
+ self.menu_handler.setCheckState(i, MenuHandler.UNCHECKED)
+ self.menu_handler.setVisible(i, False)
+
+ # update the frames_selection array to set the item as default value
+ for i, item in enumerate(self.frames_selection):
+ if item['name'] == name:
+ self.frames_selection[i] = {"name": item['name'], "checked": False, "payload": 0.0}
+ break
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def insert_subtree(self, tree_identifier : int,tip_tree_name : str, joint_names : np.ndarray, joint_ids : np.ndarray, link_names : np.ndarray):
+ """
+ Insert a new item(subtree) in the checkbox menu of frames.
+
+ Args:
+ tree_identifier (int): Identifier of the subtree.
+ tip_tree_name (str): Name of the tip link of the subtree.
+ joint_names (np.ndarray): Names of the joints in the subtree.
+ joint_ids (np.ndarray): IDs of the joints in the subtree.
+ link_names (np.ndarray): Names of the links in the subtree.
+ """
+ last_item = self.menu_handler.insert(f"Tree: [{tip_tree_name}]",command= str(tree_identifier), callback=self.callback_tree_selection)
+ self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED)
+ self.menu_handler.setVisible(last_item, True)
+
+ joints_list = np.array([], dtype=object)
+
+ for joint_name,id,link_name in zip(joint_names, joint_ids, link_names):
+ # insert the joint as a sub-menu of the subtree
+ last_entry = self.menu_handler.insert(f"{link_name}", parent=last_item, command=str(last_item), callback=self.callback_joint_tree_selection)
+ self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED)
+ self.menu_handler.setVisible(last_entry, True)
+ joints_list = np.append(joints_list,{"joint_name" : joint_name, "joint_id" : id,"link_name" : link_name})
+
+ self.subtree_selection = np.append(self.subtree_selection, {"tree" : tree_identifier, "joints" : joints_list, "selected_joint_id": None})
# apply changes
self.menu_handler.reApply(self.server)
self.server.applyChanges()
+
+ def callback_tree_selection(self, feedback):
+ """
+ Callback for the tree selection, Made only to avoid the error of missing callback.
+
+ Args:
+ feedback: Feedback from the menu selection.
+ """
+ pass
+
+
+ def callback_label(self, feedback):
+ """
+ Callback for the label of color selection. Made only to avoid the error of missing callback.
+
+ Args:
+ feedback: Feedback from the menu selection.
+ """
+ pass
+
+ def callback_joint_tree_selection(self, feedback):
+ """
+ Callback for the joint selection in the subtree to change the selection of the joints.
+
+ Args:
+ feedback: Feedback from the menu selection.
+ """
+
+ # reset the sub-menu configuration if the user change joint selection
+ self.reset_sub_menu_configuration()
+
+ # get the handle of the selected item (id)
+ handle = feedback.menu_entry_id
+ # get the title of the selected item (it contains joint name)
+ title = self.menu_handler.getTitle(handle)
+
+ # get the entry object from the menu handler with all the informations about the item (command field is used to store the parent id)
+ config_context = self.menu_handler.entry_contexts_[handle]
+
+ # get the parent id of the selected joint stored in the command field (parent == tree identifier)
+ parent_id = int(config_context.command)
+ # get the entry object of the parent
+ parent_context = self.menu_handler.entry_contexts_[parent_id]
+
+ # reset all the selections in the tree sub-menu if the joint was already selected
+ if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED:
+ parent_context.check_state = MenuHandler.UNCHECKED
+ self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED)
+
+ self.update_joint_tree_selection(int(parent_context.command), title)
+ else:
+ # set the checkbox as checked
+ parent_context.check_state = MenuHandler.CHECKED
+
+ # reset all the selections in the tree sub-menu
+ # check if a item is already checked, if so remove it and set to unchecked to prevent multiple joint selection in the same menu
+ for item in parent_context.sub_entries:
+ # check if the item is checked
+ if self.menu_handler.getCheckState(item) == MenuHandler.CHECKED:
+ # if the configuration is already checked, we need to uncheck it
+ self.menu_handler.setCheckState(item, MenuHandler.UNCHECKED)
+
+ # set the selected configuration checkbox as checked
+ self.menu_handler.setCheckState(handle, MenuHandler.CHECKED)
+
+ # set the joint as checked
+ self.update_joint_tree_selection(int(parent_context.command), title)
+
+ # check if there is at least one joint selected in the subtree menu
+ if self.get_status_joint_tree():
+ # set the workspace button visible if there is at least one joint selected in the subtree menu
+ self.menu_handler.setVisible(self.workspace_button, True)
+ else:
+ # hide the workspace button if there is no joint selected in the subtree menu
+ self.menu_handler.setVisible(self.workspace_button, False)
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+
def callback_color_selection(self, feedback):
"""
Callback for torque selection type to change the visualization color in Rviz.
@@ -142,15 +318,20 @@ def insert_dropdown_configuration(self, configuration : np.ndarray):
configuration (np.ndarray): Array of configuration to be displayed in the dropdown.
"""
+ if configuration.size == 0:
+ return
+
for i, item in enumerate(configuration):
# insert the parent in the command field to keep track of the parent id
- last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"], parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection)
+ last_entry = self.menu_handler.insert(f"Configuration {i} | tree: {item['tree_id']} | max payload : " + f"{item['max_payload']:.2f} kg" , parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection)
self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED)
self.menu_handler.setVisible(last_entry, True)
# set visible true for color selection and put the default check state
self.menu_handler.setVisible(self.torque_limits_checker, True)
self.menu_handler.setVisible(self.max_torque_checker, True)
+ self.menu_handler.setVisible(self.label_color_selection, True)
+
self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED)
self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED)
@@ -208,35 +389,29 @@ def callback_reset(self, feedback):
if item['checked']:
item = {"name": item['name'], "checked": False, "payload": 0.0}
self.frames_selection[i] = item
-
- # reset the frames selection menu (i = number of entry, item = object with name, sub entries, etc.)
- for i, item in self.menu_handler.entry_contexts_.items():
- if i == 1:
- # skip the root item (payload reset)
- continue
-
- # check if the item(frame) has sub entries (payloads selection)
- if item.sub_entries:
- # if the frame has payloads selection, we need to remove it
- for sub_item in item.sub_entries:
- self.menu_handler.setVisible(sub_item, False)
- self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
+
+
+ self.reset_sub_menu_configuration()
+
+ for item in self.menu_handler.entry_contexts_[self.root_frames].sub_entries:
+ for sub_item in self.menu_handler.entry_contexts_[item].sub_entries:
+ self.menu_handler.setVisible(sub_item, False)
+ self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
- # check if the item is the reset payloads or compute workspace item and skip the unchanging of the check state
- if item.title == "Reset payloads" or item.title == "Compute workspace":
- continue
-
- # set the checked of frame to unchecked
- self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED)
+ if item != self.reset:
+ # set the checked of frame to unchecked
+ self.menu_handler.setCheckState(item,MenuHandler.UNCHECKED)
+
# reset the selected configuration
self.selected_configuration = None
-
- # hide the torque limits and max torque checkboxes when there is no configuration selected
+
+ #hide the torque limits and max torque checkboxes when there is no configuration selected
+ self.menu_handler.setVisible(self.label_color_selection, False)
self.menu_handler.setVisible(self.torque_limits_checker, False)
self.menu_handler.setVisible(self.max_torque_checker, False)
- self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits
+ self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits#
# reapply the menu handler and server changes
self.menu_handler.reApply(self.server)
@@ -250,6 +425,11 @@ def callback_selection(self, feedback):
Args:
feedback: Feedback from the menu selection.
"""
+
+ # reset the sub-menu configuration if the user change payload selection
+ self.reset_sub_menu_configuration()
+ # -------------
+
# get name of the frame
handle = feedback.menu_entry_id
title = self.menu_handler.getTitle(handle)
@@ -269,8 +449,61 @@ def callback_selection(self, feedback):
# print the current state of the checked frames
print(f"{self.get_item_state()} \n")
-
+ def reset_sub_menu_configuration(self):
+ """
+ Function to reset the sub-menu configuration and related variables.
+ """
+ # reset calculated configurations in the workspace submenu
+ for i, item in self.menu_handler.entry_contexts_.items():
+ # check if the item is the workspace button and has sub entries and remove them from view
+ if item.sub_entries and item.title == self.menu_handler.getTitle(self.workspace_button):
+ # if the frame has payloads selection, we need to remove it
+ for sub_item in item.sub_entries:
+ self.menu_handler.setVisible(sub_item, False)
+ self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
+
+ # hide the torque limits and max torque checkboxes when there is no configuration selected
+ self.menu_handler.setVisible(self.torque_limits_checker, False)
+ self.menu_handler.setVisible(self.max_torque_checker, False)
+ self.menu_handler.setVisible(self.label_color_selection, False)
+
+ self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits
+
+ # reset the selected configuration
+ self.selected_configuration = None
+
+
+ def get_status_joint_tree(self) -> bool:
+ """
+ Check if there is at least one joint selected in the subtree menu.
+
+ Returns:
+ bool: True if at least one joint is selected, False otherwise.
+ """
+ for item in self.subtree_selection:
+ if item['selected_joint_id'] is not None:
+ return True
+
+ def update_joint_tree_selection(self, tree_identifier: int, frame_name : str):
+ """
+ Update the state of a joint in the subtree menu.
+
+ Args:
+ tree_identifier (int): Identifier of the subtree.
+ frame_name (str): Name of the selected link in the menu.
+ """
+
+ for item in self.subtree_selection:
+ if item['tree'] == tree_identifier:
+ for joint in item['joints']:
+ if joint['link_name'] == frame_name and item['selected_joint_id'] != joint['joint_id']:
+ item['selected_joint_id'] = joint['joint_id']
+ break
+ elif joint['link_name'] == frame_name and item['selected_joint_id'] == joint['joint_id']:
+ item['selected_joint_id'] = None
+ break
+
def update_item(self, name, check: bool):
"""
@@ -328,6 +561,9 @@ def update_payload_selection(self, feedback):
Args:
feedback: Feedback from the menu selection.
"""
+ # reset the sub-menu configuration if the user change payload selection
+ self.reset_sub_menu_configuration()
+
# get the handle of the selected item (id)
handle = feedback.menu_entry_id
# get the title of the selected item (it contains number of kg)
@@ -360,6 +596,14 @@ def update_payload_selection(self, feedback):
self.menu_handler.reApply(self.server)
self.server.applyChanges()
+ def get_joint_tree_selection(self) -> np.ndarray:
+ """
+ Return the selected joint in the subtree menu.
+
+ Returns:
+ np.ndarray: Array of selected joints in the subtree menu.
+ """
+ return self.subtree_selection
def get_torque_color(self) -> TorqueVisualizationType:
"""
@@ -400,6 +644,15 @@ def set_workspace_state(self, state: bool):
self.compute_workspace = state
+ def set_root_joint_name(self, name: str):
+ """
+ Set the root joint name for the interactive marker server.
+
+ Args:
+ name (str): Name of the root joint.
+ """
+ self.root_joint_name = name
+
def get_item_state(self) -> np.ndarray:
"""
Return array of checked frames in the menu list
@@ -442,7 +695,9 @@ def make_empty_marker(self, dummyBox=True):
Create interactive marker
"""
int_marker = InteractiveMarker()
- int_marker.header.frame_id = 'base_link'
+
+ int_marker.header.frame_id = self.root_joint_name
+
int_marker.pose.position.z = 2.0
int_marker.scale = 0.5
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
old mode 100644
new mode 100755
index a6825dd..0dfe34a
--- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
@@ -16,15 +16,14 @@
import rclpy
from rclpy.node import Node
-from std_msgs.msg import String, ColorRGBA
-from geometry_msgs.msg import WrenchStamped, Point
+from std_msgs.msg import String
from sensor_msgs.msg import JointState
from dynamic_payload_analysis_core.core import TorqueCalculator
import numpy as np
from visualization_msgs.msg import Marker, MarkerArray
from dynamic_payload_analysis_ros.menu_visual import MenuPayload
from dynamic_payload_analysis_ros.menu_visual import TorqueVisualizationType
-
+from builtin_interfaces.msg import Time
@@ -32,6 +31,21 @@
class RobotDescriptionSubscriber(Node):
def __init__(self):
super().__init__('node_robot_description_subscriber')
+
+ # add parameter for the node to set the expert mode or the basic mode
+ self.declare_parameter('advanced_mode', False)
+
+ # add parameter for the node to set the resolution of IK calculations
+ self.declare_parameter('resolution_ik', 0.20)
+
+ # add parameter for the node to set the range of the workspace area
+ self.declare_parameter('workspace_range', 2.0)
+
+ self.range_ik = self.get_parameter('workspace_range').get_parameter_value().double_value
+
+ self.resolution_ik = self.get_parameter('resolution_ik').get_parameter_value().double_value
+
+
self.subscription = self.create_subscription(
String,
'/robot_description',
@@ -39,8 +53,8 @@ def __init__(self):
qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1)
)
- # menu for selecting frames to apply payload
- self.menu = MenuPayload(self)
+ # object to handle interactive markers in RViz
+ self.menu = None
# Publisher for external force
self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10)
@@ -51,8 +65,8 @@ def __init__(self):
# Pusblisher for point cloud workspace area
self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', 10)
- # Publisher for point names in the workspace area
- self.publisher_workspace_area_names = self.create_publisher(MarkerArray, '/workspace_area_names', 10)
+ # Pusblisher for point cloud of maximum payloads in the workspace area
+ self.publisher_maximum_payloads = self.create_publisher(MarkerArray, '/maximum_payloads', 10)
# subscription to joint states
self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)
@@ -61,7 +75,7 @@ def __init__(self):
self.publisher_joint_states = self.create_publisher(JointState, '/joint_states', 10)
# variable to store the object of the TorqueCalculator
- self.robot = None
+ self.robot_handler = None
# variable to store external force applied on the robot
self.external_force = None
@@ -72,20 +86,33 @@ def __init__(self):
# variable to store masses applied on the frames
self.masses = None
- # variable to store the currente selected configuration from the workspace menu
+ # variable to store current selected links for payload selection
+ self.links = None
+
+ # variable to store the current selected configuration from the workspace menu
self.valid_configurations = None
+ # id of the valid configurations to detect changes
+ self.id_current_valid_config = None
+ self.prev_id_current_valid_config = None
+
+ # variable to store markers for the workspace area and maximum payloads area
+ self.marker_points = None
+ self.marker_max_payloads = None
+
# variable to store if there is a selected configuration from the workspace menu to visualize
self.selected_configuration = None
# timer to compute the valid workspace area
- self.timer_workspace_calculation = self.create_timer(1, self.workspace_calculation)
+ self.timer_workspace_calculation = self.create_timer(1.0, self.workspace_calculation)
# timer to publish the selected configuration in joint states
- self.timer_publish_configuration = self.create_timer(2, self.publish_selected_configuration)
+ self.timer_publish_configuration = self.create_timer(2.0, self.publish_selected_configuration)
# timer to calculate the external forces based on the checked frames in the menu
- self.timer_update_checked_frames = self.create_timer(0.5, self.update_checked_frames)
+ self.timer_update_checked_items = self.create_timer(0.5, self.update_checked_items)
# timer to publish the external forces as arrows in RViz
- self.timer_publish_force = self.create_timer(1, self.publish_payload_force)
+ self.timer_publish_force = self.create_timer(1.0, self.publish_payload_force)
+ # timer to update items in the menu for payload selection
+ self.timer_update_payload_selection = self.create_timer(0.5, self.update_payload_selection)
self.get_logger().info('Robot description subscriber node started')
@@ -98,62 +125,94 @@ def robot_description_callback(self, msg):
self.get_logger().info('Received robot description')
- self.robot = TorqueCalculator(robot_description = msg.data)
- self.robot.print_active_joint()
- self.robot.print_frames()
+ self.robot_handler = TorqueCalculator(robot_description = msg.data)
+ self.robot_handler.print_active_joint()
+ self.robot_handler.print_frames()
+
+ if self.menu is None:
+ # menu for selecting frames to apply payload
+ self.menu = MenuPayload(self, self.robot_handler.get_root_name())
- # Add the frame to the menu for payload selection
- for frame in self.robot.get_active_frames():
- self.menu.insert_frame(frame)
+ # Add subtree to the menu
+ for i, subtree in enumerate(self.robot_handler.get_subtrees()):
+ self.menu.insert_subtree(i,subtree['tip_link_name'], subtree["joint_names"], subtree["joint_ids"], subtree["link_names"])
+
+
+ def update_payload_selection(self):
+ """
+ Callback function to update the payload selection in the menu.
+ """
+ if self.robot_handler is not None:
+ # Add the frame to the menu for payload selection with the advanced mode parameter
- # self.robot.print_configuration()
+ links = self.robot_handler.get_links(self.get_parameter('advanced_mode').get_parameter_value().bool_value)
+
+ for link in links:
+ if link in self.links:
+ continue # Skip if the link is already in the menu
+ else:
+ self.menu.insert_frame(str(link))
+ # remove other links that are not in the current link selection
+ if self.links is not None:
+ for link in self.links:
+ if link not in links:
+ self.menu.remove_frame(str(link))
+
+ self.links = links # Update the links to the current ones
def publish_payload_force(self):
"""
Publish the gravity force on the frame with id `id_force`.
"""
- external_force_array = MarkerArray()
-
- for frame in self.menu.get_item_state():
+ if self.menu is not None:
+ external_force_array = MarkerArray()
+
+ for frame in self.menu.get_item_state():
- id_force = self.robot.get_parent_joint_id(frame["name"])
- joint_position = self.robot.get_joint_placement(id_force)
- arrow_force = Marker()
+ id_force = self.robot_handler.get_parent_joint_id(frame["name"])
+
+ # use the selected configuration from the menu to get the right joint placement
+ if self.selected_configuration is not None:
+ joint_position = self.robot_handler.get_joint_placement(id_force,self.valid_configurations[self.selected_configuration]["config"])
+ else:
+ joint_position = self.robot_handler.get_joint_placement(id_force,self.robot_handler.get_zero_configuration())
- arrow_force.header.frame_id = "base_link"
- arrow_force.header.stamp = self.get_clock().now().to_msg()
- arrow_force.ns = "external_force"
- arrow_force.id = id_force
- arrow_force.type = Marker.ARROW
+ arrow_force = Marker()
- # add the arrow if the frame is checked or delete it if not
- if frame["checked"]:
- arrow_force.action = Marker.ADD
- else:
- arrow_force.action = Marker.DELETE
-
- arrow_force.scale.x = 0.20 # Length of the arrow
- arrow_force.scale.y = 0.05 # Width of the arrow
- arrow_force.scale.z = 0.05 # Height of the arrow
- arrow_force.color.a = 1.0 # Alpha
- arrow_force.color.r = 0.0
- arrow_force.color.g = 0.0 # Green
- arrow_force.color.b = 1.0
-
- # Set the position of the arrow at the joint placement
- arrow_force.pose.position.x = joint_position["x"]
- arrow_force.pose.position.y = joint_position["y"]
- arrow_force.pose.position.z = joint_position["z"]
- # Set the direction of the arrow downwards
- arrow_force.pose.orientation.x = 0.0
- arrow_force.pose.orientation.y = 0.7071
- arrow_force.pose.orientation.z = 0.0
- arrow_force.pose.orientation.w = 0.7071
+ arrow_force.header.frame_id = self.robot_handler.get_root_name()
+ arrow_force.header.stamp = Time()
+ arrow_force.ns = "external_force"
+ arrow_force.id = id_force
+ arrow_force.type = Marker.ARROW
+
+ # add the arrow if the frame is checked or delete it if not
+ if frame["checked"]:
+ arrow_force.action = Marker.ADD
+ else:
+ arrow_force.action = Marker.DELETE
+
+ arrow_force.scale.x = 0.20 # Length of the arrow
+ arrow_force.scale.y = 0.05 # Width of the arrow
+ arrow_force.scale.z = 0.05 # Height of the arrow
+ arrow_force.color.a = 1.0 # Alpha
+ arrow_force.color.r = 0.0
+ arrow_force.color.g = 0.0 # Green
+ arrow_force.color.b = 1.0
+
+ # Set the position of the arrow at the joint placement
+ arrow_force.pose.position.x = joint_position["x"]
+ arrow_force.pose.position.y = joint_position["y"]
+ arrow_force.pose.position.z = joint_position["z"]
+ # Set the direction of the arrow downwards
+ arrow_force.pose.orientation.x = 0.0
+ arrow_force.pose.orientation.y = 0.7071
+ arrow_force.pose.orientation.z = 0.0
+ arrow_force.pose.orientation.w = 0.7071
+
+ external_force_array.markers.append(arrow_force)
- external_force_array.markers.append(arrow_force)
-
- self.publisher_force.publish(external_force_array)
+ self.publisher_force.publish(external_force_array)
@@ -161,23 +220,28 @@ def workspace_calculation(self):
"""
Callback for timer to compute the valid workspace area.
"""
- # if the user choose to compute the workspace area then compute the valid configurations
- if self.menu.get_workspace_state():
- self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "gripper_left_finger_joint", "gripper_right_finger_joint", self.masses, self.checked_frames)
-
- # insert the valid configurations in the menu
- self.menu.insert_dropdown_configuration(self.valid_configurations)
+ if self.menu is not None:
+ # if the user choose to compute the workspace area then compute the valid configurations
+ if self.menu.get_workspace_state():
+ self.valid_configurations = self.robot_handler.get_valid_workspace(range = self.range_ik,resolution= self.resolution_ik, masses = self.masses, checked_frames = self.checked_frames)
+
+ # compute the maximum payloads for the valid configurations
+ self.valid_configurations = self.robot_handler.compute_maximum_payloads(self.valid_configurations)
+
+ # insert the valid configurations in the menu
+ self.menu.insert_dropdown_configuration(self.valid_configurations)
- # clear all the workspace area markers
- self.clear_workspace_area_markers()
+ # clear all the workspace area markers
+ self.clear_workspace_area_markers()
- # set the workspace state to False to stop the computation
- self.menu.set_workspace_state(False)
+ # set the workspace state to False to stop the computation
+ self.menu.set_workspace_state(False)
- # if there are valid configurations, publish the workspace area
- if self.valid_configurations is not None:
- # publish the workspace area
- self.publish_workspace_area(self.valid_configurations)
+ # if there are valid configurations, publish the workspace area
+ if self.valid_configurations is not None:
+ # publish the workspace area
+ self.publish_workspace_area_maximum_payload_area()
+
@@ -186,81 +250,89 @@ def publish_selected_configuration(self):
Timer to publish the selected configuration.
This will publish the joint states of the selected configuration in the menu.
"""
- # get the selected configuration from the menu
- self.selected_configuration = self.menu.get_selected_configuration()
-
- # if there is a selected configuration, publish the joint states based on the valid configurations calculated previously
- if self.selected_configuration is not None:
- configs = self.robot.get_position_for_joint_states(self.valid_configurations[self.selected_configuration]["config"])
- joint_state = JointState()
- joint_state.header.stamp = self.get_clock().now().to_msg()
+ if self.menu is not None:
+ # get the selected configuration from the menu
+ self.selected_configuration = self.menu.get_selected_configuration()
- joint_state.name = [joint["joint_name"] for joint in configs]
- joint_state.position = [joint["q"] for joint in configs]
- #joint_state.position =
- self.publisher_joint_states.publish(joint_state)
-
- else:
- if self.robot is not None:
- # if there is no selected configuration, publish the joint states with zero positions
+ # if there is a selected configuration, publish the joint states based on the valid configurations calculated previously
+ if self.selected_configuration is not None:
+ configs = self.robot_handler.get_position_for_joint_states(self.valid_configurations[self.selected_configuration]["config"])
joint_state = JointState()
joint_state.header.stamp = self.get_clock().now().to_msg()
+
+ joint_state.name = [joint["joint_name"] for joint in configs]
+ joint_state.position = [joint["q"] for joint in configs]
+
+ self.publisher_joint_states.publish(joint_state)
- joint_state.name = self.robot.get_joints().tolist()
- zero_config = self.robot.get_position_for_joint_states(self.robot.get_zero_configuration())
- joint_state.position = [joint["q"] for joint in zero_config]
+ else:
+ if self.robot_handler is not None:
+ # if there is no selected configuration, publish the joint states with zero positions
+ joint_state = JointState()
+ joint_state.header.stamp = self.get_clock().now().to_msg()
- self.publisher_joint_states.publish(joint_state)
+ joint_state.name = self.robot_handler.get_joints().tolist()
+ zero_config = self.robot_handler.get_position_for_joint_states(self.robot_handler.get_zero_configuration())
+ joint_state.position = [joint["q"] for joint in zero_config]
+
+ self.publisher_joint_states.publish(joint_state)
- def update_checked_frames(self):
+ def update_checked_items(self):
"""
- Function to update the external forces based on the checked frames in the menu.
+ Function to update the external forces based on the checked frames in the menu and joint selection in the subtree menu.
"""
- # create the array with only the checked frames (with external force applied)
- self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
+ if self.menu is not None:
+ # create the array with only the checked frames (with external force applied)
+ self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
+
+ if len(self.checked_frames) != 0:
+ # create the array with the masses of the checked frames
+ self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
+ else:
+ # if there are no checked frames, set the masses to None
+ self.masses = None
+
+ selected_joint_ids = [[joint["tree"], joint["selected_joint_id"]] for joint in self.menu.get_joint_tree_selection()]
+ for tree_id, joint_id in selected_joint_ids:
+ # set the joint tree selection in the robot based on the selected joints in the menu
+ self.robot_handler.set_joint_tree_selection(tree_id, joint_id)
- if len(self.checked_frames) != 0:
- # create the array with the masses of the checked frames
- self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
- else:
- # if there are no checked frames, set the masses to None
- self.masses = None
def joint_states_callback(self, msg):
"""
Callback function for the joint states topic.
"""
- if self.robot is not None:
+ if self.robot_handler is not None:
# if you are not using the calculated configuration from workspace, you can use the joint states to compute the torques because you don't have already the computed torques
if self.selected_configuration is None:
self.positions = list(msg.position)
self.name_positions = list(msg.name)
- v = msg.velocity if msg.velocity else self.robot.get_zero_velocity()
+ v = msg.velocity if msg.velocity else self.robot_handler.get_zero_velocity()
# set the positions based on the joint states
- q = self.robot.set_position(self.positions, self.name_positions)
- a = self.robot.get_zero_acceleration()
+ q = self.robot_handler.set_position(self.positions, self.name_positions)
+ a = self.robot_handler.get_zero_acceleration()
# if there are no checked frames, set the external force to None
if len(self.checked_frames) != 0:
# create the external force with the masses and the checked frames
- self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q)
+ self.external_force = self.robot_handler.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q)
else:
self.external_force = None
# compute the inverse dynamics
- tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force)
+ tau = self.robot_handler.compute_inverse_dynamics(q, v, a, extForce=self.external_force)
# check the effort limits
- status_efforts = self.robot.check_effort_limits(tau)
+ status_efforts = self.robot_handler.check_effort_limits(tau)
# print the torques
- self.robot.print_torques(tau)
+ self.robot_handler.print_torques(tau)
# get the positions of the joints where the torques are applied based on
- joints_position, offset_z = self.robot.get_joints_placements(q)
+ joints_position, offset_z = self.robot_handler.get_joints_placements(q)
# Publish the torques in RViz
self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position)
@@ -269,9 +341,9 @@ def joint_states_callback(self, msg):
# if you are using the calculated configuration from workspace, you can use the valid configurations to visualize the torques labels
# get the positions of the joints where the torques are applied based on
- joints_position, offset_z = self.robot.get_joints_placements(self.valid_configurations[self.selected_configuration]["config"])
+ joints_position, offset_z = self.robot_handler.get_joints_placements(self.valid_configurations[self.selected_configuration]["config"])
# get the torques status (if the torques are within the limits)
- status_efforts = self.robot.check_effort_limits(self.valid_configurations[self.selected_configuration]["tau"])
+ status_efforts = self.robot_handler.check_effort_limits(self.valid_configurations[self.selected_configuration]["tau"])
self.publish_label_torques(self.valid_configurations[self.selected_configuration]["tau"], status_torques=status_efforts ,joints_position=joints_position)
@@ -286,13 +358,14 @@ def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray,
status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise
joints_position (np.ndarray): The positions of the joints where the torques are applied
"""
+ joint_z_prev = {"pos": 0.0, "labels_count": 0} # Variable to store the previous z position of the joint for label offset
marker_array = MarkerArray()
+
for i, (t, joint) in enumerate(zip(torque, joints_position)):
- # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints)
if "gripper" not in joint['name']:
marker = Marker()
- marker.header.frame_id = "base_link"
- marker.header.stamp = self.get_clock().now().to_msg()
+ marker.header.frame_id = self.robot_handler.get_root_name()
+ marker.header.stamp = Time()
marker.ns = "torque_visualization"
marker.id = i
marker.type = Marker.TEXT_VIEW_FACING
@@ -303,9 +376,18 @@ def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray,
marker.text = f"{joint['name']}: {t:.2f} Nm"
marker.action = Marker.ADD
+
marker.pose.position.x = joint['x']
marker.pose.position.y = joint['y']
- marker.pose.position.z = joint['z']
+
+ # method to avoid overlaps between labels
+ if joint['z'] == joint_z_prev["pos"]:
+ marker.pose.position.z = joint['z'] + 0.025 * joint_z_prev["labels_count"] # Offset to avoid overlaps between labels
+ joint_z_prev = {"pos" : joint['z'], "labels_count" : joint_z_prev["labels_count"] + 1}
+ else:
+ marker.pose.position.z = joint['z']
+ joint_z_prev = {"pos" : marker.pose.position.z, "labels_count" : 1}
+
marker.pose.orientation.w = 1.0
marker.scale.x = 0.03
marker.scale.y = 0.03
@@ -319,38 +401,39 @@ def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray,
marker.color.r = 1.0 # Red
marker.color.g = 0.0 # Green
marker.color.b = 0.0 # Blue
+
+
marker_array.markers.append(marker)
self.publisher_rviz_torque.publish(marker_array)
-
+
- def publish_workspace_area(self, valid_configs: np.ndarray ):
- """
- Publish the workspace area in RViz using points and labels for the end points.
- Args:
- valid_configs (np.ndarray): Current valid configurations in the workspace of the robot.
+ def generate_workspace_markers(self):
+ """
+ Generate markers for the workspace area in RViz.
+ This function creates markers for the end effector positions of the valid configurations and visualizes them
+ with colors based on the normalized torques for each joint.
+ It also creates labels for the end points of the valid configurations.
"""
- # Create a MarkerArray to visualize the number of configuration of a specific point in the workspace
- marker_point_names = MarkerArray()
- # Create a Marker for the workspace area using points
- marker_points = MarkerArray()
+ # Create a Marker for the workspace area using points and labels
+ marker_ws = MarkerArray()
# calculate the maximum torques for each joint in the current valid configurations for each arm only if the user selected the max current torque visualization
if self.menu.get_torque_color() == TorqueVisualizationType.MAX_CURRENT_TORQUE:
- max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(valid_configs)
-
+ max_torque_for_joint = self.robot_handler.get_maximum_torques(self.valid_configurations)
+
cont = 0
# Iterate through the valid configurations and create markers
- for i, valid_config in enumerate(valid_configs):
+ for i, valid_config in enumerate(self.valid_configurations):
# create the label for the end point (end effector position) of the valid configuration
marker_point_name = Marker()
- marker_point_name.header.frame_id = "base_link"
- marker_point_name.header.stamp = self.get_clock().now().to_msg()
+ marker_point_name.header.frame_id = self.robot_handler.get_root_name()
+ marker_point_name.header.stamp = Time()
- marker_point_name.ns = f"workspace_area_{valid_config['arm']}_arm"
+ marker_point_name.ns = f"labels_workspace_area__tree_{valid_config['tree_id']}"
marker_point_name.id = i + 1
marker_point_name.type = Marker.TEXT_VIEW_FACING
marker_point_name.text = f"Config {i}"
@@ -368,25 +451,22 @@ def publish_workspace_area(self, valid_configs: np.ndarray ):
marker_point_name.color.b = 1.0 # Blue
# get the joint positions for the valid configuration
- joint_positions, offset_z = self.robot.get_joints_placements(valid_config["config"])
+ joint_positions, offset_z = self.robot_handler.get_joints_placements(valid_config["config"])
# get the normalized torques for the valid configuration with current target limits for color visualization
if self.menu.get_torque_color() == TorqueVisualizationType.TORQUE_LIMITS:
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"])
+ norm_joints_torques = self.robot_handler.get_normalized_torques(valid_config["tau"])
else:
# if the user selected the max torque, use the maximum torques for the joint
- if valid_config["arm"] == "left":
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_left)
- else:
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_right)
+ norm_joints_torques = self.robot_handler.get_normalized_torques(valid_config["tau"],max_torque_for_joint, valid_config["tree_id"])
# insert points related to the end effector position in the workspace area and with color based on the normalized torques for each joint
for joint_pos,tau in zip(joint_positions,norm_joints_torques):
- # print only the points for the corrisponding arm, in this way we can visualize the workspace area for each arm separately and avoid confusion
- if valid_config["arm"] in joint_pos["name"] :
+ # print only the points for the corrisponding tree_id of the valid configuration
+ if self.robot_handler.verify_member_tree(valid_config["tree_id"],joint_pos["id"]):
point = Marker()
- point.header.frame_id = "base_link"
- point.header.stamp = self.get_clock().now().to_msg()
+ point.header.frame_id = self.robot_handler.get_root_name()
+ point.header.stamp = Time()
point.ns = joint_pos["name"]
point.id = cont
point.type = Marker.SPHERE
@@ -407,22 +487,21 @@ def publish_workspace_area(self, valid_configs: np.ndarray ):
cont += 1
# Add the point to the points array
- marker_points.markers.append(point)
+ marker_ws.markers.append(point)
- # Add the marker point name to the marker point names array
- marker_point_names.markers.append(marker_point_name)
-
+ # Add the marker point name to the array
+ marker_ws.markers.append(marker_point_name)
# get the unified torque for the valid configurations
- unified_configurations_torque = self.robot.get_unified_configurations_torque(valid_configs)
+ unified_configurations_torque = self.robot_handler.get_unified_configurations_torque(self.valid_configurations)
# insert points related to the end effector position in the workspace area and with color based on the normalized torque for each configuration
# this is used to visualize the workspace area with the unified torques for each configuration
for i, unified_config in enumerate(unified_configurations_torque):
marker_point = Marker()
- marker_point.header.frame_id = "base_link"
- marker_point.header.stamp = self.get_clock().now().to_msg()
- marker_point.ns = f"unified_torque_workspace_{unified_config['arm']}_arm"
+ marker_point.header.frame_id = self.robot_handler.get_root_name()
+ marker_point.header.stamp = Time()
+ marker_point.ns = f"unified_torque_workspace_tree_{unified_config['tree_id']}"
marker_point.id = i
marker_point.type = Marker.SPHERE
marker_point.action = Marker.ADD
@@ -441,13 +520,136 @@ def publish_workspace_area(self, valid_configs: np.ndarray ):
marker_point.color.b = 0.0 # Blue
# Add the marker point to the points array
- marker_points.markers.append(marker_point)
+ marker_ws.markers.append(marker_point)
+
+
+ return marker_ws
+
+
+ def publish_workspace_area_maximum_payload_area(self):
+ """
+ Publish the workspace area and maximum payloads area in RViz using points and labels for the end points.
+ """
+
+ self.id_current_valid_config = self.get_id_current_valid_configurations()
+ # check if the valid configurations are different from the previous one
+ if self.id_current_valid_config != self.prev_id_current_valid_config:
+ # generate the markers for the workspace area
+ self.marker_points = self.generate_workspace_markers()
+ self.marker_max_payloads = self.generate_maximum_payloads_markers()
+ else:
+ # if the valid configurations are the same as the previous one, use the previous markers
+ if self.marker_points is not None:
+ # update the header stamp of the markers to the current time
+ for marker in self.marker_points.markers:
+ marker.header.stamp = Time()
+
+ if self.marker_max_payloads is not None:
+ # update the header stamp of the maximum payloads markers to the current time
+ for marker in self.marker_max_payloads.markers:
+ marker.header.stamp = Time()
- # Publish the marker points and names
- self.publisher_workspace_area.publish(marker_points)
- self.publisher_workspace_area_names.publish(marker_point_names)
+ self.prev_id_current_valid_config = self.id_current_valid_config
+
+ # Publish the marker points
+ self.publisher_workspace_area.publish(self.marker_points)
+ # Publish the maximum payloads markers and labels
+ self.publisher_maximum_payloads.publish(self.marker_max_payloads)
+
+
+ def generate_maximum_payloads_markers(self):
+ """
+ Generate markers for the maximum payloads in the workspace area in RViz.
+ """
+ # Create a MarkerArray to visualize the maximum payloads
+ marker_max_payloads = MarkerArray()
+
+ # get the maximum payloads for each arm based on the valid configurations
+ max_payloads = self.robot_handler.get_maximum_payloads(self.valid_configurations)
+
+ # Iterate through the valid configurations and create markers
+ for i, valid_config in enumerate(self.valid_configurations):
+
+ # create the label for the end point (end effector position)
+ marker_point_name = Marker()
+ marker_point_name.header.frame_id = self.robot_handler.get_root_name()
+ marker_point_name.header.stamp = Time()
+
+ marker_point_name.ns = f"label_payloads_tree_{valid_config['tree_id']}"
+ marker_point_name.id = i
+ marker_point_name.type = Marker.TEXT_VIEW_FACING
+ marker_point_name.text = f"Config {i} \nMax payload : {valid_config['max_payload']:.2f} kg"
+ marker_point_name.action = Marker.ADD
+ marker_point_name.pose.position.x = valid_config["end_effector_pos"][0]
+ marker_point_name.pose.position.y = valid_config["end_effector_pos"][1]
+ marker_point_name.pose.position.z = valid_config["end_effector_pos"][2]
+ marker_point_name.pose.orientation.w = 1.0
+ marker_point_name.scale.x = 0.02
+ marker_point_name.scale.y = 0.02
+ marker_point_name.scale.z = 0.02
+ marker_point_name.color.a = 1.0 # Alpha
+ marker_point_name.color.r = 1.0 # Red
+ marker_point_name.color.g = 1.0 # Green
+ marker_point_name.color.b = 1.0 # Blue
+
+ # get the joint positions for the valid configuration
+ joint_positions, offset_z = self.robot_handler.get_joints_placements(valid_config["config"])
+
+ # get the normalized payload for the valid configuration with target as maximum payloads for each tree
+ max_payload_for_tree = next(payload["max_payload"] for payload in max_payloads if payload["tree_id"] == valid_config["tree_id"])
+ norm_payload = self.robot_handler.get_normalized_payload(valid_config["max_payload"], max_payload_for_tree)
+
+
+ point = Marker()
+ point.header.frame_id = self.robot_handler.get_root_name()
+ point.header.stamp = Time()
+ point.ns = f"max_payloads_tree_{valid_config['tree_id']}"
+ point.id = i
+ point.type = Marker.SPHERE
+ point.action = Marker.ADD
+ point.scale.x = 0.03 # Size of the sphere
+ point.scale.y = 0.03
+ point.scale.z = 0.03
+
+ point.pose.position.x = valid_config["end_effector_pos"][0]
+ point.pose.position.y = valid_config["end_effector_pos"][1]
+ point.pose.position.z = valid_config["end_effector_pos"][2] - offset_z
+ point.pose.orientation.w = 1.0
+ point.color.a = 1.0 # Alpha
+ point.color.r = norm_payload # Red
+ point.color.g = 1 - norm_payload # Green
+ point.color.b = 0.0 # Blue
+
+ # Add the point to the points array
+ marker_max_payloads.markers.append(point)
+
+ # Add the marker point name to the marker point names array
+ marker_max_payloads.markers.append(marker_point_name)
+
+ return marker_max_payloads
+
+
+ def get_id_current_valid_configurations(self):
+ """Generate a hash of the current valid configurations to detect changes."""
+ if self.valid_configurations is None:
+ return None
+
+ # Create a hash based on configuration data and torque color mode
+ config_str = str([
+ (config.get('tree_id', ''),
+ tuple(config.get('config', [])),
+ tuple(config.get('tau', [])),
+ tuple(config.get('end_effector_pos', [])),
+ config.get('max_payload', 0))
+ for config in self.valid_configurations
+ ])
+ torque_mode = str(self.menu.get_torque_color())
+
+ return hash(config_str + torque_mode)
+
+
def clear_workspace_area_markers(self):
"""
@@ -463,8 +665,6 @@ def clear_workspace_area_markers(self):
# Publish the empty MarkerArray to clear the markers
self.publisher_workspace_area.publish(marker_array_msg)
- self.publisher_workspace_area_names.publish(marker_array_msg)
-
diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml
index f39adf4..756fcdc 100644
--- a/dynamic_payload_analysis_ros/package.xml
+++ b/dynamic_payload_analysis_ros/package.xml
@@ -4,7 +4,7 @@
dynamic_payload_analysis_ros
0.0.0
This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.
- morolinux
+ Moro Enrico
Apache License 2.0
ament_copyright
@@ -12,6 +12,14 @@
ament_pep257
python3-pytest
+ dynamic_payload_analysis_core
+ rclpy
+ std_msgs
+ sensor_msgs
+ builtin_interfaces
+ visualization_msgs
+ interactive_markers
+
ament_python
diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py
index 901e1da..b8e2ae2 100644
--- a/dynamic_payload_analysis_ros/setup.py
+++ b/dynamic_payload_analysis_ros/setup.py
@@ -11,16 +11,18 @@
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
- install_requires=['setuptools'],
zip_safe=True,
maintainer='Enrico Moro',
maintainer_email='enrimoro003@gmail.com',
description='This package provides graphics tools in Rviz for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.',
license='Apache License 2.0',
tests_require=['pytest'],
+ install_requires=[
+ 'setuptools',
+ 'numpy'
+ ],
entry_points={
'console_scripts': [
- 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main',
'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main',
],
},