From 178d1815b462e9603432102bbbc4858ee9d4a0cf Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Wed, 11 Jun 2025 15:36:04 +0200 Subject: [PATCH 01/16] First version --- .../dynamic_payload_analysis_ros/__init__.py | 0 .../menu_visual.py | 143 ++++++++++++++++ .../rviz_visualization.py | 155 ++++++++++++++++++ .../rviz_visualization_menu.py | 155 ++++++++++++++++++ dynamic_payload_analysis_ros/package.xml | 18 ++ .../resource/dynamic_payload_analysis_ros | 0 dynamic_payload_analysis_ros/setup.cfg | 4 + dynamic_payload_analysis_ros/setup.py | 26 +++ .../test/test_copyright.py | 25 +++ .../test/test_flake8.py | 25 +++ .../test/test_pep257.py | 23 +++ 11 files changed, 574 insertions(+) create mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py create mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py create mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py create mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py create mode 100644 dynamic_payload_analysis_ros/package.xml create mode 100644 dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros create mode 100644 dynamic_payload_analysis_ros/setup.cfg create mode 100644 dynamic_payload_analysis_ros/setup.py create mode 100644 dynamic_payload_analysis_ros/test/test_copyright.py create mode 100644 dynamic_payload_analysis_ros/test/test_flake8.py create mode 100644 dynamic_payload_analysis_ros/test/test_pep257.py diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py new file mode 100644 index 0000000..b9b406e --- /dev/null +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 + + +import sys + + +from interactive_markers import InteractiveMarkerServer +from interactive_markers import MenuHandler +import rclpy +from visualization_msgs.msg import InteractiveMarker +from visualization_msgs.msg import InteractiveMarkerControl +from visualization_msgs.msg import Marker + +node = None +server = None +marker_pos = 0 + +menu_handler = MenuHandler() + +h_first_entry = 0 +h_mode_last = 0 + + +def enableCb(feedback): + handle = feedback.menu_entry_id + state = menu_handler.getCheckState(handle) + + if state == MenuHandler.CHECKED: + menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) + node.get_logger().info('Hiding first menu entry') + menu_handler.setVisible(h_first_entry, False) + else: + menu_handler.setCheckState(handle, MenuHandler.CHECKED) + node.get_logger().info('Showing first menu entry') + menu_handler.setVisible(h_first_entry, True) + + menu_handler.reApply(server) + node.get_logger().info('update') + server.applyChanges() + + +def modeCb(feedback): + global h_mode_last + menu_handler.setCheckState(h_mode_last, MenuHandler.UNCHECKED) + h_mode_last = feedback.menu_entry_id + menu_handler.setCheckState(h_mode_last, MenuHandler.CHECKED) + + node.get_logger().info('Switching to menu entry #' + str(h_mode_last)) + menu_handler.reApply(server) + print('DONE') + server.applyChanges() + + +def makeBox(msg): + marker = Marker() + + marker.type = Marker.CUBE + marker.scale.x = msg.scale * 0.45 + marker.scale.y = msg.scale * 0.45 + marker.scale.z = msg.scale * 0.45 + marker.color.r = 0.5 + marker.color.g = 0.5 + marker.color.b = 0.5 + marker.color.a = 1.0 + + return marker + + +def makeBoxControl(msg): + control = InteractiveMarkerControl() + control.always_visible = True + control.markers.append(makeBox(msg)) + msg.controls.append(control) + return control + + +def makeEmptyMarker(dummyBox=True): + global marker_pos + int_marker = InteractiveMarker() + int_marker.header.frame_id = 'base_link' + int_marker.pose.position.y = -3.0 * marker_pos + marker_pos += 1 + int_marker.scale = 1.0 + return int_marker + + +def makeMenuMarker(name): + int_marker = makeEmptyMarker() + int_marker.name = name + + control = InteractiveMarkerControl() + + control.interaction_mode = InteractiveMarkerControl.BUTTON + control.always_visible = True + + control.markers.append(makeBox(int_marker)) + int_marker.controls.append(control) + + server.insert(int_marker) + + +def deepCb(feedback): + node.get_logger().info('The deep sub-menu has been found.') + + +def initMenu(): + global h_first_entry, h_mode_last + h_first_entry = menu_handler.insert('First Entry') + entry = menu_handler.insert('deep', parent=h_first_entry) + entry = menu_handler.insert('sub', parent=entry) + entry = menu_handler.insert('menu', parent=entry, callback=deepCb) + + menu_handler.setCheckState( + menu_handler.insert('Show First Entry', callback=enableCb), + MenuHandler.CHECKED + ) + + sub_menu_handle = menu_handler.insert('Switch') + for i in range(5): + s = 'Mode ' + str(i) + h_mode_last = menu_handler.insert(s, parent=sub_menu_handle, callback=modeCb) + menu_handler.setCheckState(h_mode_last, MenuHandler.UNCHECKED) + # check the very last entry + menu_handler.setCheckState(h_mode_last, MenuHandler.CHECKED) + + +if __name__ == '__main__': + rclpy.init(args=sys.argv) + node = rclpy.create_node('menu') + + server = InteractiveMarkerServer(node, 'menu') + + initMenu() + + makeMenuMarker('marker1') + + + menu_handler.apply(server, 'marker1') + + server.applyChanges() + + rclpy.spin(node) + server.shutdown() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py new file mode 100644 index 0000000..e17af92 --- /dev/null +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String +from geometry_msgs.msg import WrenchStamped +from sensor_msgs.msg import JointState +from dynamic_payload_analysis_core.core import TorqueCalculator +import numpy as np +from visualization_msgs.msg import Marker, MarkerArray + + + +class RobotDescriptionSubscriber(Node): + def __init__(self): + super().__init__('node_robot_description_subscriber') + self.subscription = self.create_subscription( + String, + '/robot_description', + self.robot_description_callback, + qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1) + ) + + # Publisher for external force + self.publisher_force = self.create_publisher(WrenchStamped, '/external_force', 10) + + # Publisher for RViz visualization of torques + self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10) + + # subscription to joint states + self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10) + + self.robot = None + + # frame where the external force is applied + self.frame_id = "arm_left_7_link" + + self.get_logger().info('Robot description subscriber node started') + + + def robot_description_callback(self, msg): + self.get_logger().info('Received robot description') + + self.robot = TorqueCalculator(robot_description = msg.data) + self.robot.print_active_joint() + self.robot.print_frames() + + # self.robot.print_configuration() + + + def joint_states_callback(self, msg): + if self.robot is not None: + positions = list(msg.position) + name_position = list(msg.name) + v = msg.velocity if msg.velocity else self.robot.get_zero_velocity() + + q = self.robot.set_position(positions, name_position) + a = self.robot.get_zero_acceleration() + + + # Compute inverse dynamics with external force + external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q) + tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=external_force) + + status_efforts = self.robot.check_effort_limits(tau) + self.robot.print_torques(tau) + frames = self.robot.get_active_frames() + joints_position = self.robot.get_joints_placements(q) + + # Publish the torques in RViz + self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position) + + # publish the external force + self.publish_payload_force(self.frame_id, external_force) + + + def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): + """ + Publish the torque on the robot in RViz. + + Args: + torque (np.ndarray): The torque to be published + status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise + joints_position (np.ndarray): The positions of the joints where the torques are applied + """ + marker_array = MarkerArray() + for i, (t, joint) in enumerate(zip(torque, joints_position)): + marker = Marker() + marker.header.frame_id = "base_link" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "torque_visualization" + marker.id = i + marker.type = Marker.TEXT_VIEW_FACING + marker.text = f"{joint['name']}: {t:.2f} Nm" + marker.action = Marker.ADD + marker.pose.position.x = joint['x'] + marker.pose.position.y = joint['y'] + marker.pose.position.z = joint['z'] + marker.pose.orientation.w = 1.0 + marker.scale.x = 0.03 + marker.scale.y = 0.03 + marker.scale.z = 0.03 + marker.color.a = 1.0 # Alpha + if status_torques[i]: + marker.color.r = 0.0 # Red + marker.color.g = 1.0 # Green + marker.color.b = 0.0 # Blue + else: + marker.color.r = 1.0 # Red + marker.color.g = 0.0 # Green + marker.color.b = 0.0 # Blue + marker_array.markers.append(marker) + + self.publisher_rviz_torque.publish(marker_array) + + + + def publish_payload_force(self, frame_id : str, external_force: np.ndarray): + """ + Publish the gravity force on the frame with id `id_force`. + + Args: + frame_id (str): The frame where the external force is applied. + external_force (np.ndarray): The external force to be published + """ + id_force = self.robot.get_parent_joint_id(frame_id) + + force_msg = WrenchStamped() + force_msg.header.stamp = self.get_clock().now().to_msg() + force_msg.header.frame_id = frame_id + + force_msg.wrench.force.x = external_force[id_force].linear[0] + force_msg.wrench.force.y = external_force[id_force].linear[1] + force_msg.wrench.force.z = external_force[id_force].linear[2] + force_msg.wrench.torque.x = external_force[id_force].angular[0] + force_msg.wrench.torque.y = external_force[id_force].angular[1] + force_msg.wrench.torque.z = external_force[id_force].angular[2] + + self.publisher_force.publish(force_msg) + +def main(args=None): + rclpy.init(args=args) + node = RobotDescriptionSubscriber() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py new file mode 100644 index 0000000..e17af92 --- /dev/null +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String +from geometry_msgs.msg import WrenchStamped +from sensor_msgs.msg import JointState +from dynamic_payload_analysis_core.core import TorqueCalculator +import numpy as np +from visualization_msgs.msg import Marker, MarkerArray + + + +class RobotDescriptionSubscriber(Node): + def __init__(self): + super().__init__('node_robot_description_subscriber') + self.subscription = self.create_subscription( + String, + '/robot_description', + self.robot_description_callback, + qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1) + ) + + # Publisher for external force + self.publisher_force = self.create_publisher(WrenchStamped, '/external_force', 10) + + # Publisher for RViz visualization of torques + self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10) + + # subscription to joint states + self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10) + + self.robot = None + + # frame where the external force is applied + self.frame_id = "arm_left_7_link" + + self.get_logger().info('Robot description subscriber node started') + + + def robot_description_callback(self, msg): + self.get_logger().info('Received robot description') + + self.robot = TorqueCalculator(robot_description = msg.data) + self.robot.print_active_joint() + self.robot.print_frames() + + # self.robot.print_configuration() + + + def joint_states_callback(self, msg): + if self.robot is not None: + positions = list(msg.position) + name_position = list(msg.name) + v = msg.velocity if msg.velocity else self.robot.get_zero_velocity() + + q = self.robot.set_position(positions, name_position) + a = self.robot.get_zero_acceleration() + + + # Compute inverse dynamics with external force + external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q) + tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=external_force) + + status_efforts = self.robot.check_effort_limits(tau) + self.robot.print_torques(tau) + frames = self.robot.get_active_frames() + joints_position = self.robot.get_joints_placements(q) + + # Publish the torques in RViz + self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position) + + # publish the external force + self.publish_payload_force(self.frame_id, external_force) + + + def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): + """ + Publish the torque on the robot in RViz. + + Args: + torque (np.ndarray): The torque to be published + status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise + joints_position (np.ndarray): The positions of the joints where the torques are applied + """ + marker_array = MarkerArray() + for i, (t, joint) in enumerate(zip(torque, joints_position)): + marker = Marker() + marker.header.frame_id = "base_link" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "torque_visualization" + marker.id = i + marker.type = Marker.TEXT_VIEW_FACING + marker.text = f"{joint['name']}: {t:.2f} Nm" + marker.action = Marker.ADD + marker.pose.position.x = joint['x'] + marker.pose.position.y = joint['y'] + marker.pose.position.z = joint['z'] + marker.pose.orientation.w = 1.0 + marker.scale.x = 0.03 + marker.scale.y = 0.03 + marker.scale.z = 0.03 + marker.color.a = 1.0 # Alpha + if status_torques[i]: + marker.color.r = 0.0 # Red + marker.color.g = 1.0 # Green + marker.color.b = 0.0 # Blue + else: + marker.color.r = 1.0 # Red + marker.color.g = 0.0 # Green + marker.color.b = 0.0 # Blue + marker_array.markers.append(marker) + + self.publisher_rviz_torque.publish(marker_array) + + + + def publish_payload_force(self, frame_id : str, external_force: np.ndarray): + """ + Publish the gravity force on the frame with id `id_force`. + + Args: + frame_id (str): The frame where the external force is applied. + external_force (np.ndarray): The external force to be published + """ + id_force = self.robot.get_parent_joint_id(frame_id) + + force_msg = WrenchStamped() + force_msg.header.stamp = self.get_clock().now().to_msg() + force_msg.header.frame_id = frame_id + + force_msg.wrench.force.x = external_force[id_force].linear[0] + force_msg.wrench.force.y = external_force[id_force].linear[1] + force_msg.wrench.force.z = external_force[id_force].linear[2] + force_msg.wrench.torque.x = external_force[id_force].angular[0] + force_msg.wrench.torque.y = external_force[id_force].angular[1] + force_msg.wrench.torque.z = external_force[id_force].angular[2] + + self.publisher_force.publish(force_msg) + +def main(args=None): + rclpy.init(args=args) + node = RobotDescriptionSubscriber() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml new file mode 100644 index 0000000..c0fba99 --- /dev/null +++ b/dynamic_payload_analysis_ros/package.xml @@ -0,0 +1,18 @@ + + + + dynamic_payload_analysis_ros + 0.0.0 + This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling. + morolinux + TODO: License declaration + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros b/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros new file mode 100644 index 0000000..e69de29 diff --git a/dynamic_payload_analysis_ros/setup.cfg b/dynamic_payload_analysis_ros/setup.cfg new file mode 100644 index 0000000..5a64333 --- /dev/null +++ b/dynamic_payload_analysis_ros/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/dynamic_payload_analysis_ros +[install] +install_scripts=$base/lib/dynamic_payload_analysis_ros diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py new file mode 100644 index 0000000..41972de --- /dev/null +++ b/dynamic_payload_analysis_ros/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages, setup + +package_name = 'dynamic_payload_analysis_ros' + +setup( + name=package_name, + version='0.0.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='morolinux', + maintainer_email='enrimoro003@gmail.com', + description='This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.', + license='TODO: License declaration', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main', + ], + }, +) diff --git a/dynamic_payload_analysis_ros/test/test_copyright.py b/dynamic_payload_analysis_ros/test/test_copyright.py new file mode 100644 index 0000000..97a3919 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_copyright.py @@ -0,0 +1,25 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_copyright.main import main +import pytest + + +# Remove the `skip` decorator once the source file(s) have a copyright header +@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') +@pytest.mark.copyright +@pytest.mark.linter +def test_copyright(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found errors' diff --git a/dynamic_payload_analysis_ros/test/test_flake8.py b/dynamic_payload_analysis_ros/test/test_flake8.py new file mode 100644 index 0000000..27ee107 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_flake8.py @@ -0,0 +1,25 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_flake8.main import main_with_errors +import pytest + + +@pytest.mark.flake8 +@pytest.mark.linter +def test_flake8(): + rc, errors = main_with_errors(argv=[]) + assert rc == 0, \ + 'Found %d code style errors / warnings:\n' % len(errors) + \ + '\n'.join(errors) diff --git a/dynamic_payload_analysis_ros/test/test_pep257.py b/dynamic_payload_analysis_ros/test/test_pep257.py new file mode 100644 index 0000000..b234a38 --- /dev/null +++ b/dynamic_payload_analysis_ros/test/test_pep257.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_pep257.main import main +import pytest + + +@pytest.mark.linter +@pytest.mark.pep257 +def test_pep257(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found code style errors / warnings' From df00c9fa11388cdef56cbefc9d2edfac915bff7d Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Wed, 11 Jun 2025 20:59:30 +0200 Subject: [PATCH 02/16] Fix problem print N or Nm with prismatic and revolute joints --- .../dynamic_payload_analysis_core/core.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index 21b5298..6a7f984 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -413,15 +413,20 @@ def print_torques(self, tau : np.ndarray): """ if tau is None: raise ValueError("Torques vector is None") - + print("Torques vector:") for i, torque in enumerate(tau): - # TODO Extract type of joint in order to print right measure unit ([N] or [Nm]) - print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm") - + # check if the joint is a prismatic joint + if self.model.joints[i+1].shortname() == "JointModelPZ": + print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} N") + + # for revolute joints + else: + print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm") print("\n") + def print_frames(self): """ Print the frames of the robot model. From 78ff4f03c374326420923b1ba3ef011fa2df5e36 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Wed, 11 Jun 2025 21:05:48 +0200 Subject: [PATCH 03/16] Small change in function to return type of joint --- .../dynamic_payload_analysis_core/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index 6a7f984..fa46235 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -400,7 +400,7 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray: """ self.update_configuration(q) - placements = np.array([({"name" : self.model.names[i], "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) + placements = np.array([({"name" : self.model.names[i],"type" : self.model.joints[i].shortname() , "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) return placements From d2011c0c643234eeba2e825f79359758622065f4 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Wed, 11 Jun 2025 21:38:58 +0200 Subject: [PATCH 04/16] Add function to get single joint placement --- .../dynamic_payload_analysis_core/core.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index fa46235..a688dfa 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -403,6 +403,21 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray: placements = np.array([({"name" : self.model.names[i],"type" : self.model.joints[i].shortname() , "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) return placements + + + def get_joint_placement(self, joint_id : int) -> np.ndarray: + """ + Get the placement of a specific joint in the robot model. + + :param joint_id: ID of the joint to get the placement for. + :return: Placement of the joint as a numpy array. + """ + + if joint_id < 0 or joint_id >= self.model.njoints: + raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.") + + placement = self.data.oMi[joint_id].translation + return np.array(placement) def print_torques(self, tau : np.ndarray): From 857ed84f61befee16b7e34fb224885d4b9adcb47 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 12 Jun 2025 12:46:04 +0200 Subject: [PATCH 05/16] Small changes --- .../menu_visual.py | 234 +++++++++--------- .../rviz_visualization_menu.py | 62 ++++- dynamic_payload_analysis_ros/setup.py | 1 + 3 files changed, 174 insertions(+), 123 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index b9b406e..116c788 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -2,7 +2,7 @@ import sys - +import numpy as np from interactive_markers import InteractiveMarkerServer from interactive_markers import MenuHandler @@ -11,133 +11,137 @@ from visualization_msgs.msg import InteractiveMarkerControl from visualization_msgs.msg import Marker -node = None -server = None -marker_pos = 0 - -menu_handler = MenuHandler() - -h_first_entry = 0 -h_mode_last = 0 - - -def enableCb(feedback): - handle = feedback.menu_entry_id - state = menu_handler.getCheckState(handle) - - if state == MenuHandler.CHECKED: - menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) - node.get_logger().info('Hiding first menu entry') - menu_handler.setVisible(h_first_entry, False) - else: - menu_handler.setCheckState(handle, MenuHandler.CHECKED) - node.get_logger().info('Showing first menu entry') - menu_handler.setVisible(h_first_entry, True) - - menu_handler.reApply(server) - node.get_logger().info('update') - server.applyChanges() - - -def modeCb(feedback): - global h_mode_last - menu_handler.setCheckState(h_mode_last, MenuHandler.UNCHECKED) - h_mode_last = feedback.menu_entry_id - menu_handler.setCheckState(h_mode_last, MenuHandler.CHECKED) - - node.get_logger().info('Switching to menu entry #' + str(h_mode_last)) - menu_handler.reApply(server) - print('DONE') - server.applyChanges() - - -def makeBox(msg): - marker = Marker() - - marker.type = Marker.CUBE - marker.scale.x = msg.scale * 0.45 - marker.scale.y = msg.scale * 0.45 - marker.scale.z = msg.scale * 0.45 - marker.color.r = 0.5 - marker.color.g = 0.5 - marker.color.b = 0.5 - marker.color.a = 1.0 - - return marker +class MenuPayload(): + def __init__(self, node): + # create server for interactive markers + self.server = InteractiveMarkerServer(node, 'menu_frames') + # array to store the checked frames + self.checked_frames = np.array([],dtype=object) + # position of the marker in rviz + self.marker_pos = 2 + # create handler for menu + self.menu_handler = MenuHandler() + # insert the root menu item + self.root = self.menu_handler.insert('Select where to apply payload') + + self.make_menu_marker('menu_frames') + # add server to menu and apply changes + self.menu_handler.apply(self.server, 'menu_frames') + self.server.applyChanges() + + def insert(self, name): + """ + Insert a new item in the checkbox menu. + """ + last_item = self.menu_handler.insert(name, parent=self.root, callback=self.callback_selection) + self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_item, True) + + self.checked_frames = np.append(self.checked_frames, {"name": name, "checked" : False} ) + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + + def callback_selection(self, feedback): + """ + Callback for menu selection + """ + # change the check state of the selected item + handle = feedback.menu_entry_id + title = self.menu_handler.getTitle(handle) + if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED: + self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) + self.update_item(title, False) + else: + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + self.update_item(title, True) + + # update the menu state + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + # print the current state of the checked frames + print(f"{self.get_item_state()} \n") + -def makeBoxControl(msg): - control = InteractiveMarkerControl() - control.always_visible = True - control.markers.append(makeBox(msg)) - msg.controls.append(control) - return control - - -def makeEmptyMarker(dummyBox=True): - global marker_pos - int_marker = InteractiveMarker() - int_marker.header.frame_id = 'base_link' - int_marker.pose.position.y = -3.0 * marker_pos - marker_pos += 1 - int_marker.scale = 1.0 - return int_marker - - -def makeMenuMarker(name): - int_marker = makeEmptyMarker() - int_marker.name = name - - control = InteractiveMarkerControl() - - control.interaction_mode = InteractiveMarkerControl.BUTTON - control.always_visible = True - - control.markers.append(makeBox(int_marker)) - int_marker.controls.append(control) + def update_item(self, name, status: bool): + """ + Update the state of an item in the array. + + Args: + name (str): Name of the item to update. + status (bool): New checked state of the item. + """ + + for item in self.checked_frames: + if item['name'] == name: + item['checked'] = status + break + - server.insert(int_marker) + def get_item_state(self) -> np.ndarray: + """ + Return array of checked frames in the menu list + + Returns: + np.ndarray: array of checked frames + """ + return self.checked_frames + + def make_box(self, msg): + marker = Marker() -def deepCb(feedback): - node.get_logger().info('The deep sub-menu has been found.') + marker.type = Marker.TEXT_VIEW_FACING + marker.text = "Click here to select frame" + marker.scale.x = msg.scale * 0.45 + marker.scale.y = msg.scale * 0.45 + marker.scale.z = msg.scale * 0.45 + marker.color.r = 0.1 + marker.color.g = 0.0 + marker.color.b = 0.5 + marker.color.a = 1.0 + return marker + + def make_box_control(self, msg): + control = InteractiveMarkerControl() + control.always_visible = True + control.markers.append(self.make_box(msg)) + msg.controls.append(control) + + return control -def initMenu(): - global h_first_entry, h_mode_last - h_first_entry = menu_handler.insert('First Entry') - entry = menu_handler.insert('deep', parent=h_first_entry) - entry = menu_handler.insert('sub', parent=entry) - entry = menu_handler.insert('menu', parent=entry, callback=deepCb) - menu_handler.setCheckState( - menu_handler.insert('Show First Entry', callback=enableCb), - MenuHandler.CHECKED - ) + def make_empty_marker(self, dummyBox=True): + + int_marker = InteractiveMarker() + int_marker.header.frame_id = 'base_link' + int_marker.pose.position.z = 1.0 * self.marker_pos + int_marker.scale = 0.5 + + + return int_marker - sub_menu_handle = menu_handler.insert('Switch') - for i in range(5): - s = 'Mode ' + str(i) - h_mode_last = menu_handler.insert(s, parent=sub_menu_handle, callback=modeCb) - menu_handler.setCheckState(h_mode_last, MenuHandler.UNCHECKED) - # check the very last entry - menu_handler.setCheckState(h_mode_last, MenuHandler.CHECKED) + def make_menu_marker(self, name): + int_marker = self.make_empty_marker() + int_marker.name = name -if __name__ == '__main__': - rclpy.init(args=sys.argv) - node = rclpy.create_node('menu') + control = InteractiveMarkerControl() - server = InteractiveMarkerServer(node, 'menu') + control.interaction_mode = InteractiveMarkerControl.BUTTON + control.always_visible = True - initMenu() + control.markers.append(self.make_box(int_marker)) + int_marker.controls.append(control) - makeMenuMarker('marker1') - + self.server.insert(int_marker) - menu_handler.apply(server, 'marker1') - - server.applyChanges() - rclpy.spin(node) - server.shutdown() \ No newline at end of file + def shutdown(self): + """ + Shutdown the interactive marker server. + """ + self.server.shutdown() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index e17af92..2abc4e1 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -8,7 +8,7 @@ from dynamic_payload_analysis_core.core import TorqueCalculator import numpy as np from visualization_msgs.msg import Marker, MarkerArray - +from dynamic_payload_analysis_ros.menu_visual import MenuPayload class RobotDescriptionSubscriber(Node): @@ -21,6 +21,9 @@ def __init__(self): qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1) ) + # menu for selecting frames to apply payload + self.menu = MenuPayload(self) + # Publisher for external force self.publisher_force = self.create_publisher(WrenchStamped, '/external_force', 10) @@ -35,6 +38,8 @@ def __init__(self): # frame where the external force is applied self.frame_id = "arm_left_7_link" + self.external_force = None + self.get_logger().info('Robot description subscriber node started') @@ -44,6 +49,10 @@ def robot_description_callback(self, msg): self.robot = TorqueCalculator(robot_description = msg.data) self.robot.print_active_joint() self.robot.print_frames() + + # Add the frame to the menu for selecting where to apply the payload + for frame in self.robot.get_active_frames(): + self.menu.insert(frame) # self.robot.print_configuration() @@ -59,8 +68,13 @@ def joint_states_callback(self, msg): # Compute inverse dynamics with external force - external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q) - tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=external_force) + for frame in self.menu.get_item_state(): + if frame['checked']: + self.frame_id = frame['name'] + self.external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q) + + + tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force) status_efforts = self.robot.check_effort_limits(tau) self.robot.print_torques(tau) @@ -71,7 +85,8 @@ def joint_states_callback(self, msg): self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position) # publish the external force - self.publish_payload_force(self.frame_id, external_force) + if self.external_force is not None: + self.publish_payload_force(self.frame_id, self.external_force) def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): @@ -91,7 +106,12 @@ def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, j marker.ns = "torque_visualization" marker.id = i marker.type = Marker.TEXT_VIEW_FACING - marker.text = f"{joint['name']}: {t:.2f} Nm" + # Set the text based on the joint type + if joint['type'] == 'JointModelPZ': + marker.text = f"{joint['name']}: {t:.2f} N" + else: + marker.text = f"{joint['name']}: {t:.2f} Nm" + marker.action = Marker.ADD marker.pose.position.x = joint['x'] marker.pose.position.y = joint['y'] @@ -124,23 +144,49 @@ def publish_payload_force(self, frame_id : str, external_force: np.ndarray): external_force (np.ndarray): The external force to be published """ id_force = self.robot.get_parent_joint_id(frame_id) + position = self.robot.get_joint_placement(id_force) + + # force = Marker() + # force.header.frame_id = frame_id + # force.header.stamp = self.get_clock().now().to_msg() + # force.ns = "external_force" + # force.id = id_force + # force.type = Marker.ARROW + # force.action = Marker.ADD + # force.scale.x = 0.20 + # force.scale.y = 0.05 + # force.scale.z = 0.05 + # force.color.a = 1.0 # Alpha + # force.color.r = 0.0 # Red + # force.color.g = 0.0 + # force.color.b = 1.0 # Blue + # # Set the direction of the arrow based on the external force + # force.pose.orientation.x = external_force[id_force].linear[0] + # force.pose.orientation.y = external_force[id_force].linear[1] + # force.pose.orientation.z = external_force[id_force].linear[2] + # force.pose.orientation.w = 0.0 # Set to 1.0 for a straight arrow + # self.publisher_force.publish(force) force_msg = WrenchStamped() + force_msg.header.stamp = self.get_clock().now().to_msg() force_msg.header.frame_id = frame_id - force_msg.wrench.force.x = external_force[id_force].linear[0] - force_msg.wrench.force.y = external_force[id_force].linear[1] - force_msg.wrench.force.z = external_force[id_force].linear[2] + force_msg.wrench.force.x = external_force[id_force].linear[0] * 0.01 + force_msg.wrench.force.y = external_force[id_force].linear[1] * 0.01 + force_msg.wrench.force.z = external_force[id_force].linear[2] * 0.01 force_msg.wrench.torque.x = external_force[id_force].angular[0] force_msg.wrench.torque.y = external_force[id_force].angular[1] force_msg.wrench.torque.z = external_force[id_force].angular[2] + + self.publisher_force.publish(force_msg) def main(args=None): rclpy.init(args=args) node = RobotDescriptionSubscriber() + try: rclpy.spin(node) diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py index 41972de..bb6057e 100644 --- a/dynamic_payload_analysis_ros/setup.py +++ b/dynamic_payload_analysis_ros/setup.py @@ -21,6 +21,7 @@ entry_points={ 'console_scripts': [ 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main', + 'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main', ], }, ) From 2c2f47abfc1df700d41d808888c99593aed3a96d Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 12 Jun 2025 12:57:01 +0200 Subject: [PATCH 06/16] Add license --- .../dynamic_payload_analysis_ros/menu_visual.py | 13 +++++++++++++ .../rviz_visualization.py | 14 ++++++++++++++ .../rviz_visualization_menu.py | 14 ++++++++++++++ dynamic_payload_analysis_ros/package.xml | 2 +- 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index 116c788..23bd5e7 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -1,5 +1,18 @@ #!/usr/bin/env python3 +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import sys import numpy as np diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py index e17af92..75106ad 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py @@ -1,5 +1,19 @@ #!/usr/bin/env python3 +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import rclpy from rclpy.node import Node from std_msgs.msg import String diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index 2abc4e1..401b73b 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -1,5 +1,19 @@ #!/usr/bin/env python3 +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import rclpy from rclpy.node import Node from std_msgs.msg import String diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml index c0fba99..f39adf4 100644 --- a/dynamic_payload_analysis_ros/package.xml +++ b/dynamic_payload_analysis_ros/package.xml @@ -5,7 +5,7 @@ 0.0.0 This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling. morolinux - TODO: License declaration + Apache License 2.0 ament_copyright ament_flake8 From 29341d0d3daca3feb69d91674358b3935c8a3402 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 12 Jun 2025 13:01:41 +0200 Subject: [PATCH 07/16] Fix license --- .../dynamic_payload_analysis_core/core.py | 16 +++++++++++++++- dynamic_payload_analysis_core/package.xml | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index a688dfa..b230415 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -1,4 +1,18 @@ -# Library to handle calculations for inverse and forward dynamics + +# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import pinocchio as pin import numpy as np diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml index 9b6674c..3b5dfae 100644 --- a/dynamic_payload_analysis_core/package.xml +++ b/dynamic_payload_analysis_core/package.xml @@ -5,7 +5,7 @@ 0.0.0 Core package with calculation for torques calculator morolinux - TODO: License declaration + Apache License 2.0 ament_copyright ament_flake8 From 2f0b2224c726e01238843728688e1e5ed5d5d14c Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 12 Jun 2025 17:39:01 +0200 Subject: [PATCH 08/16] Change function to create external force to support multiple forces --- .../dynamic_payload_analysis_core/core.py | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index b230415..78e6c65 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -61,13 +61,13 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n return tau - def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np.ndarray[pin.Force]: + def create_ext_force(self, mass : float, frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]: """ Create external forces vector based on the mass and frame ID. The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint. :param mass: Mass of the object to apply the force to. - :param frame_id: Frame ID where the force is applied. + :param frame_name: Frame name where the force is applied or vector of frame names where the forces is applied. :return: External force vector. """ if mass < 0: @@ -84,23 +84,39 @@ def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np # Initialize external forces array fext = [pin.Force(np.zeros(6)) for _ in range(self.model.njoints)] - self.update_configuration(q) + self.update_configuration(q) - # Get the frame ID and joint ID from the frame name - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint + # Check if frame_name is a single string or an array of strings + if isinstance(frame_name, str): + # Get the frame ID and joint ID from the frame name + frame_id = self.model.getFrameId(frame_name) + joint_id = self.model.frames[frame_id].parentJoint - # force expressed in the world frame (gravity force in z axis) - ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) + # force expressed in the world frame (gravity force in z axis) + ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) - # placement of the frame in the world frame - frame_placement = self.data.oMf[frame_id] - #print(f"Frame placement: {frame_placement}") - - # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied - fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) - # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied - fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient + # placement of the frame in the world frame + #frame_placement = self.data.oMf[frame_id] + #print(f"Frame placement: {frame_placement}") + + # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied + fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) + # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied + fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient + + else: + for frame in frame_name: + frame_id = self.model.getFrameId(frame) + joint_id = self.model.frames[frame_id].parentJoint + + # force expressed in the world frame (gravity force in z axis) + ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) + # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied + fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) + # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied + fext[joint_id].angular = np.zeros(3) + + return fext From 7dbda98c7cce85d684c5e9074c8b699f42874cf4 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 12 Jun 2025 17:40:31 +0200 Subject: [PATCH 09/16] Add possibility to select multiple payloads --- .../rviz_visualization_menu.py | 107 ++++++++++-------- 1 file changed, 60 insertions(+), 47 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index 401b73b..3f78a47 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -25,6 +25,9 @@ from dynamic_payload_analysis_ros.menu_visual import MenuPayload +#TODO : Add payload mass selection in the menu +#TODO : Add possibility to select multiple frames with payload + class RobotDescriptionSubscriber(Node): def __init__(self): super().__init__('node_robot_description_subscriber') @@ -39,7 +42,7 @@ def __init__(self): self.menu = MenuPayload(self) # Publisher for external force - self.publisher_force = self.create_publisher(WrenchStamped, '/external_force', 10) + self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10) # Publisher for RViz visualization of torques self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10) @@ -50,7 +53,7 @@ def __init__(self): self.robot = None # frame where the external force is applied - self.frame_id = "arm_left_7_link" + self.frame_id = None self.external_force = None @@ -80,18 +83,25 @@ def joint_states_callback(self, msg): q = self.robot.set_position(positions, name_position) a = self.robot.get_zero_acceleration() + + # create the array with only the checked frames (with external force applied) + self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) - # Compute inverse dynamics with external force - for frame in self.menu.get_item_state(): - if frame['checked']: - self.frame_id = frame['name'] - self.external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q) - + # if there are no checked frames, set the external force to None + if len(self.checked_frames) != 0: + self.external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.checked_frames, q=q) + else: + self.external_force = None + # compute the inverse dynamics tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force) + # check the effort limits status_efforts = self.robot.check_effort_limits(tau) + # print the torques self.robot.print_torques(tau) + + # get the active frames and joints positions frames = self.robot.get_active_frames() joints_position = self.robot.get_joints_placements(q) @@ -99,8 +109,7 @@ def joint_states_callback(self, msg): self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position) # publish the external force - if self.external_force is not None: - self.publish_payload_force(self.frame_id, self.external_force) + self.publish_payload_force(self.menu.get_item_state()) def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): @@ -149,7 +158,7 @@ def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, j - def publish_payload_force(self, frame_id : str, external_force: np.ndarray): + def publish_payload_force(self, frames_names : np.ndarray[str]): """ Publish the gravity force on the frame with id `id_force`. @@ -157,45 +166,49 @@ def publish_payload_force(self, frame_id : str, external_force: np.ndarray): frame_id (str): The frame where the external force is applied. external_force (np.ndarray): The external force to be published """ - id_force = self.robot.get_parent_joint_id(frame_id) - position = self.robot.get_joint_placement(id_force) - - # force = Marker() - # force.header.frame_id = frame_id - # force.header.stamp = self.get_clock().now().to_msg() - # force.ns = "external_force" - # force.id = id_force - # force.type = Marker.ARROW - # force.action = Marker.ADD - # force.scale.x = 0.20 - # force.scale.y = 0.05 - # force.scale.z = 0.05 - # force.color.a = 1.0 # Alpha - # force.color.r = 0.0 # Red - # force.color.g = 0.0 - # force.color.b = 1.0 # Blue - # # Set the direction of the arrow based on the external force - # force.pose.orientation.x = external_force[id_force].linear[0] - # force.pose.orientation.y = external_force[id_force].linear[1] - # force.pose.orientation.z = external_force[id_force].linear[2] - # force.pose.orientation.w = 0.0 # Set to 1.0 for a straight arrow - # self.publisher_force.publish(force) - - force_msg = WrenchStamped() - - force_msg.header.stamp = self.get_clock().now().to_msg() - force_msg.header.frame_id = frame_id - - force_msg.wrench.force.x = external_force[id_force].linear[0] * 0.01 - force_msg.wrench.force.y = external_force[id_force].linear[1] * 0.01 - force_msg.wrench.force.z = external_force[id_force].linear[2] * 0.01 - force_msg.wrench.torque.x = external_force[id_force].angular[0] - force_msg.wrench.torque.y = external_force[id_force].angular[1] - force_msg.wrench.torque.z = external_force[id_force].angular[2] + external_force_array = MarkerArray() + + for frame in frames_names: + + id_force = self.robot.get_parent_joint_id(frame["name"]) + position = self.robot.get_joint_placement(id_force) + arrow_force = Marker() + + arrow_force.header.frame_id = "base_link" + arrow_force.header.stamp = self.get_clock().now().to_msg() + arrow_force.ns = "external_force" + arrow_force.id = id_force + arrow_force.type = Marker.ARROW + + # add the arrow if the frame is checked or delete it if not + if frame["checked"]: + arrow_force.action = Marker.ADD + else: + arrow_force.action = Marker.DELETE + + arrow_force.scale.x = 0.20 # Length of the arrow + arrow_force.scale.y = 0.05 # Width of the arrow + arrow_force.scale.z = 0.05 # Height of the arrow + arrow_force.color.a = 1.0 # Alpha + arrow_force.color.r = 0.0 + arrow_force.color.g = 0.0 # Green + arrow_force.color.b = 1.0 + + # Set the position of the arrow at the joint placement + arrow_force.pose.position.x = position[0] + arrow_force.pose.position.y = position[1] + arrow_force.pose.position.z = position[2] + # Set the direction of the arrow downwards + arrow_force.pose.orientation.x = 0.0 + arrow_force.pose.orientation.y = 0.7071 + arrow_force.pose.orientation.z = 0.0 + arrow_force.pose.orientation.w = 0.7071 + + external_force_array.markers.append(arrow_force) + self.publisher_force.publish(external_force_array) - self.publisher_force.publish(force_msg) def main(args=None): rclpy.init(args=args) From 1e1b7aec1f0f8cacf707e7858f2dd33cedb90f06 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Fri, 13 Jun 2025 20:06:01 +0200 Subject: [PATCH 10/16] Change return type in function get_joint_placement --- .../dynamic_payload_analysis_core/core.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index 78e6c65..e7f3c3a 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -435,19 +435,21 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray: return placements - def get_joint_placement(self, joint_id : int) -> np.ndarray: + def get_joint_placement(self, joint_id : int) -> dict: """ Get the placement of a specific joint in the robot model. :param joint_id: ID of the joint to get the placement for. - :return: Placement of the joint as a numpy array. + :return: Dictionary with coordinates x , y , z of the joint. """ if joint_id < 0 or joint_id >= self.model.njoints: raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.") placement = self.data.oMi[joint_id].translation - return np.array(placement) + + return {"x" : placement[0], "y": placement[1], "z": placement[2]} + def print_torques(self, tau : np.ndarray): From c06f566b81a060e8925bf2a13c56ca05a4740567 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Fri, 13 Jun 2025 20:07:12 +0200 Subject: [PATCH 11/16] Remove labels of gripper joints in the rviz visualization --- .../rviz_visualization_menu.py | 88 ++++++++++--------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index 3f78a47..cbe2cac 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -26,7 +26,8 @@ #TODO : Add payload mass selection in the menu -#TODO : Add possibility to select multiple frames with payload +#TODO : Remove mimim joints in label visualization + class RobotDescriptionSubscriber(Node): def __init__(self): @@ -67,7 +68,7 @@ def robot_description_callback(self, msg): self.robot.print_active_joint() self.robot.print_frames() - # Add the frame to the menu for selecting where to apply the payload + # Add the frame to the menu for payload selection for frame in self.robot.get_active_frames(): self.menu.insert(frame) @@ -103,18 +104,20 @@ def joint_states_callback(self, msg): # get the active frames and joints positions frames = self.robot.get_active_frames() + + # get the positions of the joints where the torques are applied joints_position = self.robot.get_joints_placements(q) # Publish the torques in RViz - self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position) + self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position) - # publish the external force + # publish the external force as arrows in RViz self.publish_payload_force(self.menu.get_item_state()) - def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): + def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): """ - Publish the torque on the robot in RViz. + Publish the torque with labels on the robot in RViz. Args: torque (np.ndarray): The torque to be published @@ -123,36 +126,38 @@ def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, j """ marker_array = MarkerArray() for i, (t, joint) in enumerate(zip(torque, joints_position)): - marker = Marker() - marker.header.frame_id = "base_link" - marker.header.stamp = self.get_clock().now().to_msg() - marker.ns = "torque_visualization" - marker.id = i - marker.type = Marker.TEXT_VIEW_FACING - # Set the text based on the joint type - if joint['type'] == 'JointModelPZ': - marker.text = f"{joint['name']}: {t:.2f} N" - else: - marker.text = f"{joint['name']}: {t:.2f} Nm" - - marker.action = Marker.ADD - marker.pose.position.x = joint['x'] - marker.pose.position.y = joint['y'] - marker.pose.position.z = joint['z'] - marker.pose.orientation.w = 1.0 - marker.scale.x = 0.03 - marker.scale.y = 0.03 - marker.scale.z = 0.03 - marker.color.a = 1.0 # Alpha - if status_torques[i]: - marker.color.r = 0.0 # Red - marker.color.g = 1.0 # Green - marker.color.b = 0.0 # Blue - else: - marker.color.r = 1.0 # Red - marker.color.g = 0.0 # Green - marker.color.b = 0.0 # Blue - marker_array.markers.append(marker) + # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints) + if "gripper" not in joint['name']: + marker = Marker() + marker.header.frame_id = "base_link" + marker.header.stamp = self.get_clock().now().to_msg() + marker.ns = "torque_visualization" + marker.id = i + marker.type = Marker.TEXT_VIEW_FACING + # Set the text based on the joint type + if joint['type'] == 'JointModelPZ': + marker.text = f"{joint['name']}: {t:.2f} N" + else: + marker.text = f"{joint['name']}: {t:.2f} Nm" + + marker.action = Marker.ADD + marker.pose.position.x = joint['x'] + marker.pose.position.y = joint['y'] + marker.pose.position.z = joint['z'] + marker.pose.orientation.w = 1.0 + marker.scale.x = 0.03 + marker.scale.y = 0.03 + marker.scale.z = 0.03 + marker.color.a = 1.0 # Alpha + if status_torques[i]: + marker.color.r = 0.0 # Red + marker.color.g = 1.0 # Green + marker.color.b = 0.0 # Blue + else: + marker.color.r = 1.0 # Red + marker.color.g = 0.0 # Green + marker.color.b = 0.0 # Blue + marker_array.markers.append(marker) self.publisher_rviz_torque.publish(marker_array) @@ -163,15 +168,14 @@ def publish_payload_force(self, frames_names : np.ndarray[str]): Publish the gravity force on the frame with id `id_force`. Args: - frame_id (str): The frame where the external force is applied. - external_force (np.ndarray): The external force to be published + frame_names : The frames where the external forces is applied. """ external_force_array = MarkerArray() for frame in frames_names: id_force = self.robot.get_parent_joint_id(frame["name"]) - position = self.robot.get_joint_placement(id_force) + joint_position = self.robot.get_joint_placement(id_force) arrow_force = Marker() arrow_force.header.frame_id = "base_link" @@ -195,9 +199,9 @@ def publish_payload_force(self, frames_names : np.ndarray[str]): arrow_force.color.b = 1.0 # Set the position of the arrow at the joint placement - arrow_force.pose.position.x = position[0] - arrow_force.pose.position.y = position[1] - arrow_force.pose.position.z = position[2] + arrow_force.pose.position.x = joint_position["x"] + arrow_force.pose.position.y = joint_position["y"] + arrow_force.pose.position.z = joint_position["z"] # Set the direction of the arrow downwards arrow_force.pose.orientation.x = 0.0 arrow_force.pose.orientation.y = 0.7071 From 022efe402171e87bdf6f40aa41fc27f8d2efce39 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Fri, 13 Jun 2025 21:34:17 +0200 Subject: [PATCH 12/16] First version with payload --- .../menu_visual.py | 117 +++++++++++++++--- 1 file changed, 103 insertions(+), 14 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index 23bd5e7..6d8b35c 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -29,14 +29,21 @@ class MenuPayload(): def __init__(self, node): # create server for interactive markers self.server = InteractiveMarkerServer(node, 'menu_frames') - # array to store the checked frames - self.checked_frames = np.array([],dtype=object) - # position of the marker in rviz - self.marker_pos = 2 + + # array to store the checked and unchecked frames + self.frames_selection = np.array([],dtype=object) + # create handler for menu self.menu_handler = MenuHandler() - # insert the root menu item - self.root = self.menu_handler.insert('Select where to apply payload') + + #current managed frame + self.current_frame = None + + # payload mass selection array + self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float) + + # insert the root menu items + self.root_frames = self.menu_handler.insert('Select where to apply payload') self.make_menu_marker('menu_frames') # add server to menu and apply changes @@ -47,28 +54,39 @@ def insert(self, name): """ Insert a new item in the checkbox menu. """ - last_item = self.menu_handler.insert(name, parent=self.root, callback=self.callback_selection) + last_item = self.menu_handler.insert(name, parent=self.root_frames, callback=self.callback_selection) self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED) self.menu_handler.setVisible(last_item, True) - self.checked_frames = np.append(self.checked_frames, {"name": name, "checked" : False} ) + # add the item to the checked frames array in order to keep track of the checked items + self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} ) + self.menu_handler.reApply(self.server) self.server.applyChanges() def callback_selection(self, feedback): """ - Callback for menu selection + Callback for menu selection to change the check state of the selected item. """ # change the check state of the selected item handle = feedback.menu_entry_id title = self.menu_handler.getTitle(handle) + if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED: - self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) + # remove the item from the menu of payload selection + self.manage_payload_menu(handle, False) + # update the frames_selection array to set the item as unchecked self.update_item(title, False) + + self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) else: - self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + # add the item to the menu of payload selection + self.manage_payload_menu(handle, True) + # update the frames_selection array to set the item as checked self.update_item(title, True) + + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) # update the menu state self.menu_handler.reApply(self.server) @@ -87,11 +105,82 @@ def update_item(self, name, status: bool): status (bool): New checked state of the item. """ - for item in self.checked_frames: + for item in self.frames_selection: if item['name'] == name: item['checked'] = status break + + def update_payload(self, name, payload: float): + """ + Update the payload mass of an item in the array. + + Args: + name (str): Name of the item to update. + payload (float): New payload mass of the item. + """ + + for item in self.frames_selection: + if item['name'] == name: + item['payload'] = payload + break + + def remove_selection_payload(self, handler): + """ + Remove the payload selection from the array. + + Args: + handler (str): frame handler. + """ + + for item in handler.sub_entries: + self.menu_handler.setVisible(item, False) + + + def manage_payload_menu(self, menu_entry_id, action: bool): + """ + Add or remove a new checked frame to the payload menu. + + Args: + menu_entry_id : ID of the menu entry to add or remove. + action (bool): True to add the item, False to remove it. + """ + name = self.menu_handler.getTitle(menu_entry_id) + # if the action is to add the item, we need to insert the payload mass selection items + if action: + for payload in self.payload_selection: + # insert the payload mass selection items + last_entry = self.menu_handler.insert(f"{payload} kg", parent=menu_entry_id, callback=self.update_payload_selection) + self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_entry, True) + + # if the action is to remove the item, we need to remove the payload mass selection items + else: + # remove the payload mass selection items + self.remove_selection_payload(menu_entry_id) + + self.menu_handler.reApply(self.server) + self.server.applyChanges() + def update_payload_selection(self, feedback): + """ + Callback for payload selection to change the current payload. + + Args: + feedback: Feedback from the menu selection. + """ + handle = feedback.menu_entry_id + title = self.menu_handler.getTitle(handle) + + if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED: + self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) + #self.update_payload(name_frame, 0.0) + else: + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + #self.update_payload(name_frame, float(title.split()[0])) # Extract the payload mass from the title + + self.menu_handler.reApply(self.server) + self.server.applyChanges() + def get_item_state(self) -> np.ndarray: """ @@ -100,7 +189,7 @@ def get_item_state(self) -> np.ndarray: Returns: np.ndarray: array of checked frames """ - return self.checked_frames + return self.frames_selection def make_box(self, msg): @@ -131,7 +220,7 @@ def make_empty_marker(self, dummyBox=True): int_marker = InteractiveMarker() int_marker.header.frame_id = 'base_link' - int_marker.pose.position.z = 1.0 * self.marker_pos + int_marker.pose.position.z = 2.0 int_marker.scale = 0.5 From 48080c7b7fb72e42b4c27a9d9580976cdc6e74c4 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Sun, 15 Jun 2025 15:44:45 +0200 Subject: [PATCH 13/16] Add payload selection as sub-menu --- .../menu_visual.py | 178 +++++++++++------- .../rviz_visualization_menu.py | 6 +- 2 files changed, 111 insertions(+), 73 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index 6d8b35c..a580b43 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -30,7 +30,7 @@ def __init__(self, node): # create server for interactive markers self.server = InteractiveMarkerServer(node, 'menu_frames') - # array to store the checked and unchecked frames + # array to store the checked orunchecked frames and payload selection self.frames_selection = np.array([],dtype=object) # create handler for menu @@ -44,15 +44,20 @@ def __init__(self, node): # insert the root menu items self.root_frames = self.menu_handler.insert('Select where to apply payload') + # insert the reset payload button + self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset) self.make_menu_marker('menu_frames') # add server to menu and apply changes self.menu_handler.apply(self.server, 'menu_frames') self.server.applyChanges() - def insert(self, name): + def insert_frame(self, name): """ - Insert a new item in the checkbox menu. + Insert a new item(frame) in the checkbox menu of frames. + + Args: + name (str) : name of the frame """ last_item = self.menu_handler.insert(name, parent=self.root_frames, callback=self.callback_selection) self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED) @@ -61,32 +66,57 @@ def insert(self, name): # add the item to the checked frames array in order to keep track of the checked items self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} ) + # apply changes self.menu_handler.reApply(self.server) self.server.applyChanges() - + + def callback_reset(self, feedback): + """ + Callback for reset menu selection to remove all checked items from the menu. + """ + # remove all checked items from the array + for i, item in enumerate(self.frames_selection): + if item['checked']: + item = {"name": item['name'], "checked": False, "payload": 0.0} + self.frames_selection[i] = item + + # reset the frames selection menu (i = number of entry, item = object with name, sub entries, etc.) + for i, item in self.menu_handler.entry_contexts_.items(): + if i == 1: + # skip the root item (payload reset) + continue + + # check if the item(frame) has sub entries (payloads selection) + if item.sub_entries: + # if the frame has payloads selection, we need to remove it + for sub_item in item.sub_entries: + self.menu_handler.setVisible(sub_item, False) + self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED) + + # set the checked of frame to unchecked + self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED) + + # reapply the menu handler and server changes + self.menu_handler.reApply(self.server) + self.server.applyChanges() + + def callback_selection(self, feedback): """ - Callback for menu selection to change the check state of the selected item. + Callback for menu selection to change the check state of the selected item(frame). """ - # change the check state of the selected item + # get name of the frame handle = feedback.menu_entry_id title = self.menu_handler.getTitle(handle) - if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED: - # remove the item from the menu of payload selection - self.manage_payload_menu(handle, False) - # update the frames_selection array to set the item as unchecked - self.update_item(title, False) - - self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) - else: - # add the item to the menu of payload selection - self.manage_payload_menu(handle, True) - # update the frames_selection array to set the item as checked - self.update_item(title, True) - - self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + # add payloads selection as a submenu + self.manage_payload_menu(handle) + # update the frames_selection array to set the item as checked + self.update_item(title, True) + + # set the checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) # update the menu state self.menu_handler.reApply(self.server) @@ -96,27 +126,28 @@ def callback_selection(self, feedback): print(f"{self.get_item_state()} \n") - def update_item(self, name, status: bool): + def update_item(self, name, check: bool): """ - Update the state of an item in the array. + Update the state of an item(frame) in the array. Args: - name (str): Name of the item to update. - status (bool): New checked state of the item. + name (str): Name of the item(frame) to update. + check (bool): New state of the item. """ for item in self.frames_selection: if item['name'] == name: - item['checked'] = status + item['checked'] = check break + def update_payload(self, name, payload: float): """ Update the payload mass of an item in the array. Args: - name (str): Name of the item to update. - payload (float): New payload mass of the item. + name (str): Name of the item(frame) to update. + payload (float): New payload mass that act on the frame. """ for item in self.frames_selection: @@ -124,43 +155,26 @@ def update_payload(self, name, payload: float): item['payload'] = payload break - def remove_selection_payload(self, handler): - """ - Remove the payload selection from the array. - - Args: - handler (str): frame handler. - """ - - for item in handler.sub_entries: - self.menu_handler.setVisible(item, False) - - def manage_payload_menu(self, menu_entry_id, action: bool): + def manage_payload_menu(self, menu_entry_id): """ - Add or remove a new checked frame to the payload menu. - + Add payload selection items as a sub-menu for the specified menu entry. + Args: - menu_entry_id : ID of the menu entry to add or remove. - action (bool): True to add the item, False to remove it. - """ - name = self.menu_handler.getTitle(menu_entry_id) - # if the action is to add the item, we need to insert the payload mass selection items - if action: - for payload in self.payload_selection: - # insert the payload mass selection items - last_entry = self.menu_handler.insert(f"{payload} kg", parent=menu_entry_id, callback=self.update_payload_selection) - self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) - self.menu_handler.setVisible(last_entry, True) - - # if the action is to remove the item, we need to remove the payload mass selection items - else: - # remove the payload mass selection items - self.remove_selection_payload(menu_entry_id) + menu_entry_id : ID of the menu entry. + """ + # insert as a sub menu all the payload stored in the array + for payload in self.payload_selection: + # insert the payload mass selection items (command = str(menu_entry_id) is used to identify the parent menu entry) + last_entry = self.menu_handler.insert(f"{payload} kg", parent=menu_entry_id, command = str(menu_entry_id), callback=self.update_payload_selection) + self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) + self.menu_handler.setVisible(last_entry, True) + # apply changes self.menu_handler.reApply(self.server) self.server.applyChanges() + def update_payload_selection(self, feedback): """ Callback for payload selection to change the current payload. @@ -168,16 +182,35 @@ def update_payload_selection(self, feedback): Args: feedback: Feedback from the menu selection. """ + # get the handle of the selected item (id) handle = feedback.menu_entry_id + # get the title of the selected item (it contains number of kg) title = self.menu_handler.getTitle(handle) + # get the entry object from the menu handler with all the informations about the item (command field is used to store the parent id) + payload_context = self.menu_handler.entry_contexts_[handle] + + # get the parent id of the selected payload stored in the command field + parent_id = int(payload_context.command) + # get the entry object of the parent + parent_context = self.menu_handler.entry_contexts_[parent_id] + # get the name of the parent item (frame name) + name_parent = self.menu_handler.getTitle(parent_id) - if self.menu_handler.getCheckState(handle) == MenuHandler.CHECKED: - self.menu_handler.setCheckState(handle, MenuHandler.UNCHECKED) - #self.update_payload(name_frame, 0.0) - else: - self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) - #self.update_payload(name_frame, float(title.split()[0])) # Extract the payload mass from the title + # reset all the selections in the payload sub-menu + # check if a item is already checked, if so remove it and set to unchecked to prevent multiple payload selection in the same menu + for item in parent_context.sub_entries: + # check if the item is checked + if self.menu_handler.getCheckState(item) == MenuHandler.CHECKED: + # if the item is already checked, we need to uncheck it and set the payload to 0.0 + self.menu_handler.setCheckState(item, MenuHandler.UNCHECKED) + self.update_payload(name_parent, 0.0) + + # set the selected payload checkbox as checked + self.menu_handler.setCheckState(handle, MenuHandler.CHECKED) + # uppate the array with the new payload + self.update_payload(name_parent, float(title.split()[0])) # Extract the payload mass from the title + # apply changes self.menu_handler.reApply(self.server) self.server.applyChanges() @@ -192,7 +225,10 @@ def get_item_state(self) -> np.ndarray: return self.frames_selection - def make_box(self, msg): + def make_label(self, msg): + """ + Create the label used to click over in rviz + """ marker = Marker() marker.type = Marker.TEXT_VIEW_FACING @@ -207,27 +243,31 @@ def make_box(self, msg): return marker - def make_box_control(self, msg): + def make_label_control(self, msg): control = InteractiveMarkerControl() control.always_visible = True - control.markers.append(self.make_box(msg)) + control.markers.append(self.make_label(msg)) msg.controls.append(control) return control def make_empty_marker(self, dummyBox=True): - + """ + Create interactive marker + """ int_marker = InteractiveMarker() int_marker.header.frame_id = 'base_link' int_marker.pose.position.z = 2.0 int_marker.scale = 0.5 - return int_marker def make_menu_marker(self, name): + """ + Create interactive menu + """ int_marker = self.make_empty_marker() int_marker.name = name @@ -236,7 +276,7 @@ def make_menu_marker(self, name): control.interaction_mode = InteractiveMarkerControl.BUTTON control.always_visible = True - control.markers.append(self.make_box(int_marker)) + control.markers.append(self.make_label(int_marker)) int_marker.controls.append(control) self.server.insert(int_marker) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index cbe2cac..08ae542 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -26,7 +26,6 @@ #TODO : Add payload mass selection in the menu -#TODO : Remove mimim joints in label visualization class RobotDescriptionSubscriber(Node): @@ -70,7 +69,7 @@ def robot_description_callback(self, msg): # Add the frame to the menu for payload selection for frame in self.robot.get_active_frames(): - self.menu.insert(frame) + self.menu.insert_frame(frame) # self.robot.print_configuration() @@ -102,7 +101,7 @@ def joint_states_callback(self, msg): # print the torques self.robot.print_torques(tau) - # get the active frames and joints positions + # get the active frames frames = self.robot.get_active_frames() # get the positions of the joints where the torques are applied @@ -218,7 +217,6 @@ def main(args=None): rclpy.init(args=args) node = RobotDescriptionSubscriber() - try: rclpy.spin(node) except KeyboardInterrupt: From 91bd87178ca82e75474295acff4c290383a41dce Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Sun, 15 Jun 2025 16:11:30 +0200 Subject: [PATCH 14/16] Small change in create_ext_force, add possibility to use array of masses --- .../dynamic_payload_analysis_core/core.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index e7f3c3a..f619b72 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -61,17 +61,18 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n return tau - def create_ext_force(self, mass : float, frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]: + def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]: """ Create external forces vector based on the mass and frame ID. The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint. - :param mass: Mass of the object to apply the force to. - :param frame_name: Frame name where the force is applied or vector of frame names where the forces is applied. + :param masses (float, np.ndarray) : Mass of the object to apply the force to or vector with masses related to frames names. + :param frame_name(str , np.ndarray) : Frame name where the force is applied or vector of frame names where the forces is applied. :return: External force vector. """ - if mass < 0: - raise ValueError("Mass must be a positive value") + if isinstance(masses, float): + if masses < 0: + raise ValueError("Mass must be a positive value") if frame_name is None: raise ValueError("Frame name must be provided") @@ -93,7 +94,7 @@ def create_ext_force(self, mass : float, frame_name : Union[str | np.ndarray], q joint_id = self.model.frames[frame_id].parentJoint # force expressed in the world frame (gravity force in z axis) - ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) + ext_force_world = pin.Force(np.array([0.0, 0.0, masses * -9.81]), np.array([0.0, 0.0, 0.0])) # placement of the frame in the world frame #frame_placement = self.data.oMf[frame_id] @@ -105,7 +106,7 @@ def create_ext_force(self, mass : float, frame_name : Union[str | np.ndarray], q fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient else: - for frame in frame_name: + for mass, frame in zip(masses,frame_name): frame_id = self.model.getFrameId(frame) joint_id = self.model.frames[frame_id].parentJoint From 99918d9617ab3350d1d6a2085fb61b665bea5403 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Sun, 15 Jun 2025 16:12:36 +0200 Subject: [PATCH 15/16] Add possibility to select multiple frames with different paylaods --- .../rviz_visualization_menu.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index 08ae542..d394382 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -86,10 +86,13 @@ def joint_states_callback(self, msg): # create the array with only the checked frames (with external force applied) self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) - + # if there are no checked frames, set the external force to None if len(self.checked_frames) != 0: - self.external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.checked_frames, q=q) + # create the array with the masses of the checked frames + self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) + # create the external force with the masses and the checked frames + self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q) else: self.external_force = None From e9337b71dae118ed804bf7cfcfb38c7fd39167be Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Thu, 19 Jun 2025 11:22:07 +0200 Subject: [PATCH 16/16] Small fix in reset function --- .../dynamic_payload_analysis_ros/menu_visual.py | 11 +++++++---- .../rviz_visualization_menu.py | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index a580b43..ce20421 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -46,6 +46,7 @@ def __init__(self, node): self.root_frames = self.menu_handler.insert('Select where to apply payload') # insert the reset payload button self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset) + self.menu_handler.setCheckState(self.reset, MenuHandler.NO_CHECKBOX) self.make_menu_marker('menu_frames') # add server to menu and apply changes @@ -84,7 +85,7 @@ def callback_reset(self, feedback): # reset the frames selection menu (i = number of entry, item = object with name, sub entries, etc.) for i, item in self.menu_handler.entry_contexts_.items(): if i == 1: - # skip the root item (payload reset) + # skip the root item continue # check if the item(frame) has sub entries (payloads selection) @@ -93,9 +94,11 @@ def callback_reset(self, feedback): for sub_item in item.sub_entries: self.menu_handler.setVisible(sub_item, False) self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED) - - # set the checked of frame to unchecked - self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED) + + # skip the reset payloads item + if item.title != 'Reset payloads': + # set the checked of frame to unchecked + self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED) # reapply the menu handler and server changes self.menu_handler.reApply(self.server) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index d394382..fd2eb84 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -25,7 +25,7 @@ from dynamic_payload_analysis_ros.menu_visual import MenuPayload -#TODO : Add payload mass selection in the menu + class RobotDescriptionSubscriber(Node):