diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
index 21b5298..f619b72 100644
--- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
+++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
@@ -1,4 +1,18 @@
-# Library to handle calculations for inverse and forward dynamics
+
+# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
import pinocchio as pin
import numpy as np
@@ -47,17 +61,18 @@ def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : n
return tau
- def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np.ndarray[pin.Force]:
+ def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]:
"""
Create external forces vector based on the mass and frame ID.
The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint.
- :param mass: Mass of the object to apply the force to.
- :param frame_id: Frame ID where the force is applied.
+ :param masses (float, np.ndarray) : Mass of the object to apply the force to or vector with masses related to frames names.
+ :param frame_name(str , np.ndarray) : Frame name where the force is applied or vector of frame names where the forces is applied.
:return: External force vector.
"""
- if mass < 0:
- raise ValueError("Mass must be a positive value")
+ if isinstance(masses, float):
+ if masses < 0:
+ raise ValueError("Mass must be a positive value")
if frame_name is None:
raise ValueError("Frame name must be provided")
@@ -70,23 +85,39 @@ def create_ext_force(self, mass : float, frame_name : str, q : np.ndarray) -> np
# Initialize external forces array
fext = [pin.Force(np.zeros(6)) for _ in range(self.model.njoints)]
- self.update_configuration(q)
+ self.update_configuration(q)
- # Get the frame ID and joint ID from the frame name
- frame_id = self.model.getFrameId(frame_name)
- joint_id = self.model.frames[frame_id].parentJoint
+ # Check if frame_name is a single string or an array of strings
+ if isinstance(frame_name, str):
+ # Get the frame ID and joint ID from the frame name
+ frame_id = self.model.getFrameId(frame_name)
+ joint_id = self.model.frames[frame_id].parentJoint
- # force expressed in the world frame (gravity force in z axis)
- ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0]))
+ # force expressed in the world frame (gravity force in z axis)
+ ext_force_world = pin.Force(np.array([0.0, 0.0, masses * -9.81]), np.array([0.0, 0.0, 0.0]))
- # placement of the frame in the world frame
- frame_placement = self.data.oMf[frame_id]
- #print(f"Frame placement: {frame_placement}")
-
- # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied
- fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world)
- # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied
- fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient
+ # placement of the frame in the world frame
+ #frame_placement = self.data.oMf[frame_id]
+ #print(f"Frame placement: {frame_placement}")
+
+ # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied
+ fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world)
+ # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied
+ fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient
+
+ else:
+ for mass, frame in zip(masses,frame_name):
+ frame_id = self.model.getFrameId(frame)
+ joint_id = self.model.frames[frame_id].parentJoint
+
+ # force expressed in the world frame (gravity force in z axis)
+ ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0]))
+ # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied
+ fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world)
+ # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied
+ fext[joint_id].angular = np.zeros(3)
+
+
return fext
@@ -400,9 +431,26 @@ def get_joints_placements(self, q : np.ndarray) -> np.ndarray:
"""
self.update_configuration(q)
- placements = np.array([({"name" : self.model.names[i], "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object)
+ placements = np.array([({"name" : self.model.names[i],"type" : self.model.joints[i].shortname() , "x": self.data.oMi[i].translation[0], "y": self.data.oMi[i].translation[1], "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object)
return placements
+
+
+ def get_joint_placement(self, joint_id : int) -> dict:
+ """
+ Get the placement of a specific joint in the robot model.
+
+ :param joint_id: ID of the joint to get the placement for.
+ :return: Dictionary with coordinates x , y , z of the joint.
+ """
+
+ if joint_id < 0 or joint_id >= self.model.njoints:
+ raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.")
+
+ placement = self.data.oMi[joint_id].translation
+
+ return {"x" : placement[0], "y": placement[1], "z": placement[2]}
+
def print_torques(self, tau : np.ndarray):
@@ -413,15 +461,20 @@ def print_torques(self, tau : np.ndarray):
"""
if tau is None:
raise ValueError("Torques vector is None")
-
+
print("Torques vector:")
for i, torque in enumerate(tau):
- # TODO Extract type of joint in order to print right measure unit ([N] or [Nm])
- print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm")
-
+ # check if the joint is a prismatic joint
+ if self.model.joints[i+1].shortname() == "JointModelPZ":
+ print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} N")
+
+ # for revolute joints
+ else:
+ print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm")
print("\n")
+
def print_frames(self):
"""
Print the frames of the robot model.
diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml
index 9b6674c..3b5dfae 100644
--- a/dynamic_payload_analysis_core/package.xml
+++ b/dynamic_payload_analysis_core/package.xml
@@ -5,7 +5,7 @@
0.0.0
Core package with calculation for torques calculator
morolinux
- TODO: License declaration
+ Apache License 2.0
ament_copyright
ament_flake8
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
new file mode 100644
index 0000000..ce20421
--- /dev/null
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
@@ -0,0 +1,292 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import numpy as np
+
+from interactive_markers import InteractiveMarkerServer
+from interactive_markers import MenuHandler
+import rclpy
+from visualization_msgs.msg import InteractiveMarker
+from visualization_msgs.msg import InteractiveMarkerControl
+from visualization_msgs.msg import Marker
+
+
+class MenuPayload():
+ def __init__(self, node):
+ # create server for interactive markers
+ self.server = InteractiveMarkerServer(node, 'menu_frames')
+
+ # array to store the checked orunchecked frames and payload selection
+ self.frames_selection = np.array([],dtype=object)
+
+ # create handler for menu
+ self.menu_handler = MenuHandler()
+
+ #current managed frame
+ self.current_frame = None
+
+ # payload mass selection array
+ self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float)
+
+ # insert the root menu items
+ self.root_frames = self.menu_handler.insert('Select where to apply payload')
+ # insert the reset payload button
+ self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset)
+ self.menu_handler.setCheckState(self.reset, MenuHandler.NO_CHECKBOX)
+
+ self.make_menu_marker('menu_frames')
+ # add server to menu and apply changes
+ self.menu_handler.apply(self.server, 'menu_frames')
+ self.server.applyChanges()
+
+ def insert_frame(self, name):
+ """
+ Insert a new item(frame) in the checkbox menu of frames.
+
+ Args:
+ name (str) : name of the frame
+ """
+ last_item = self.menu_handler.insert(name, parent=self.root_frames, callback=self.callback_selection)
+ self.menu_handler.setCheckState(last_item, MenuHandler.UNCHECKED)
+ self.menu_handler.setVisible(last_item, True)
+
+ # add the item to the checked frames array in order to keep track of the checked items
+ self.frames_selection = np.append(self.frames_selection, {"name": name, "checked" : False, "payload" : 0.0} )
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def callback_reset(self, feedback):
+ """
+ Callback for reset menu selection to remove all checked items from the menu.
+ """
+ # remove all checked items from the array
+ for i, item in enumerate(self.frames_selection):
+ if item['checked']:
+ item = {"name": item['name'], "checked": False, "payload": 0.0}
+ self.frames_selection[i] = item
+
+ # reset the frames selection menu (i = number of entry, item = object with name, sub entries, etc.)
+ for i, item in self.menu_handler.entry_contexts_.items():
+ if i == 1:
+ # skip the root item
+ continue
+
+ # check if the item(frame) has sub entries (payloads selection)
+ if item.sub_entries:
+ # if the frame has payloads selection, we need to remove it
+ for sub_item in item.sub_entries:
+ self.menu_handler.setVisible(sub_item, False)
+ self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
+
+ # skip the reset payloads item
+ if item.title != 'Reset payloads':
+ # set the checked of frame to unchecked
+ self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED)
+
+ # reapply the menu handler and server changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def callback_selection(self, feedback):
+ """
+ Callback for menu selection to change the check state of the selected item(frame).
+ """
+ # get name of the frame
+ handle = feedback.menu_entry_id
+ title = self.menu_handler.getTitle(handle)
+
+ # add payloads selection as a submenu
+ self.manage_payload_menu(handle)
+ # update the frames_selection array to set the item as checked
+ self.update_item(title, True)
+
+ # set the checkbox as checked
+ self.menu_handler.setCheckState(handle, MenuHandler.CHECKED)
+
+ # update the menu state
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+ # print the current state of the checked frames
+ print(f"{self.get_item_state()} \n")
+
+
+ def update_item(self, name, check: bool):
+ """
+ Update the state of an item(frame) in the array.
+
+ Args:
+ name (str): Name of the item(frame) to update.
+ check (bool): New state of the item.
+ """
+
+ for item in self.frames_selection:
+ if item['name'] == name:
+ item['checked'] = check
+ break
+
+
+ def update_payload(self, name, payload: float):
+ """
+ Update the payload mass of an item in the array.
+
+ Args:
+ name (str): Name of the item(frame) to update.
+ payload (float): New payload mass that act on the frame.
+ """
+
+ for item in self.frames_selection:
+ if item['name'] == name:
+ item['payload'] = payload
+ break
+
+
+ def manage_payload_menu(self, menu_entry_id):
+ """
+ Add payload selection items as a sub-menu for the specified menu entry.
+
+ Args:
+ menu_entry_id : ID of the menu entry.
+ """
+ # insert as a sub menu all the payload stored in the array
+ for payload in self.payload_selection:
+ # insert the payload mass selection items (command = str(menu_entry_id) is used to identify the parent menu entry)
+ last_entry = self.menu_handler.insert(f"{payload} kg", parent=menu_entry_id, command = str(menu_entry_id), callback=self.update_payload_selection)
+ self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED)
+ self.menu_handler.setVisible(last_entry, True)
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def update_payload_selection(self, feedback):
+ """
+ Callback for payload selection to change the current payload.
+
+ Args:
+ feedback: Feedback from the menu selection.
+ """
+ # get the handle of the selected item (id)
+ handle = feedback.menu_entry_id
+ # get the title of the selected item (it contains number of kg)
+ title = self.menu_handler.getTitle(handle)
+ # get the entry object from the menu handler with all the informations about the item (command field is used to store the parent id)
+ payload_context = self.menu_handler.entry_contexts_[handle]
+
+ # get the parent id of the selected payload stored in the command field
+ parent_id = int(payload_context.command)
+ # get the entry object of the parent
+ parent_context = self.menu_handler.entry_contexts_[parent_id]
+ # get the name of the parent item (frame name)
+ name_parent = self.menu_handler.getTitle(parent_id)
+
+ # reset all the selections in the payload sub-menu
+ # check if a item is already checked, if so remove it and set to unchecked to prevent multiple payload selection in the same menu
+ for item in parent_context.sub_entries:
+ # check if the item is checked
+ if self.menu_handler.getCheckState(item) == MenuHandler.CHECKED:
+ # if the item is already checked, we need to uncheck it and set the payload to 0.0
+ self.menu_handler.setCheckState(item, MenuHandler.UNCHECKED)
+ self.update_payload(name_parent, 0.0)
+
+ # set the selected payload checkbox as checked
+ self.menu_handler.setCheckState(handle, MenuHandler.CHECKED)
+ # uppate the array with the new payload
+ self.update_payload(name_parent, float(title.split()[0])) # Extract the payload mass from the title
+
+ # apply changes
+ self.menu_handler.reApply(self.server)
+ self.server.applyChanges()
+
+
+ def get_item_state(self) -> np.ndarray:
+ """
+ Return array of checked frames in the menu list
+
+ Returns:
+ np.ndarray: array of checked frames
+ """
+ return self.frames_selection
+
+
+ def make_label(self, msg):
+ """
+ Create the label used to click over in rviz
+ """
+ marker = Marker()
+
+ marker.type = Marker.TEXT_VIEW_FACING
+ marker.text = "Click here to select frame"
+ marker.scale.x = msg.scale * 0.45
+ marker.scale.y = msg.scale * 0.45
+ marker.scale.z = msg.scale * 0.45
+ marker.color.r = 0.1
+ marker.color.g = 0.0
+ marker.color.b = 0.5
+ marker.color.a = 1.0
+
+ return marker
+
+ def make_label_control(self, msg):
+ control = InteractiveMarkerControl()
+ control.always_visible = True
+ control.markers.append(self.make_label(msg))
+ msg.controls.append(control)
+
+ return control
+
+
+ def make_empty_marker(self, dummyBox=True):
+ """
+ Create interactive marker
+ """
+ int_marker = InteractiveMarker()
+ int_marker.header.frame_id = 'base_link'
+ int_marker.pose.position.z = 2.0
+ int_marker.scale = 0.5
+
+ return int_marker
+
+
+ def make_menu_marker(self, name):
+ """
+ Create interactive menu
+ """
+ int_marker = self.make_empty_marker()
+ int_marker.name = name
+
+ control = InteractiveMarkerControl()
+
+ control.interaction_mode = InteractiveMarkerControl.BUTTON
+ control.always_visible = True
+
+ control.markers.append(self.make_label(int_marker))
+ int_marker.controls.append(control)
+
+ self.server.insert(int_marker)
+
+
+ def shutdown(self):
+ """
+ Shutdown the interactive marker server.
+ """
+ self.server.shutdown()
\ No newline at end of file
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py
new file mode 100644
index 0000000..75106ad
--- /dev/null
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from rclpy.node import Node
+from std_msgs.msg import String
+from geometry_msgs.msg import WrenchStamped
+from sensor_msgs.msg import JointState
+from dynamic_payload_analysis_core.core import TorqueCalculator
+import numpy as np
+from visualization_msgs.msg import Marker, MarkerArray
+
+
+
+class RobotDescriptionSubscriber(Node):
+ def __init__(self):
+ super().__init__('node_robot_description_subscriber')
+ self.subscription = self.create_subscription(
+ String,
+ '/robot_description',
+ self.robot_description_callback,
+ qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1)
+ )
+
+ # Publisher for external force
+ self.publisher_force = self.create_publisher(WrenchStamped, '/external_force', 10)
+
+ # Publisher for RViz visualization of torques
+ self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10)
+
+ # subscription to joint states
+ self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)
+
+ self.robot = None
+
+ # frame where the external force is applied
+ self.frame_id = "arm_left_7_link"
+
+ self.get_logger().info('Robot description subscriber node started')
+
+
+ def robot_description_callback(self, msg):
+ self.get_logger().info('Received robot description')
+
+ self.robot = TorqueCalculator(robot_description = msg.data)
+ self.robot.print_active_joint()
+ self.robot.print_frames()
+
+ # self.robot.print_configuration()
+
+
+ def joint_states_callback(self, msg):
+ if self.robot is not None:
+ positions = list(msg.position)
+ name_position = list(msg.name)
+ v = msg.velocity if msg.velocity else self.robot.get_zero_velocity()
+
+ q = self.robot.set_position(positions, name_position)
+ a = self.robot.get_zero_acceleration()
+
+
+ # Compute inverse dynamics with external force
+ external_force = self.robot.create_ext_force(mass=4.0, frame_name=self.frame_id, q=q)
+ tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=external_force)
+
+ status_efforts = self.robot.check_effort_limits(tau)
+ self.robot.print_torques(tau)
+ frames = self.robot.get_active_frames()
+ joints_position = self.robot.get_joints_placements(q)
+
+ # Publish the torques in RViz
+ self.publish_rviz_torque(tau, status_torques=status_efforts ,joints_position=joints_position)
+
+ # publish the external force
+ self.publish_payload_force(self.frame_id, external_force)
+
+
+ def publish_rviz_torque(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray):
+ """
+ Publish the torque on the robot in RViz.
+
+ Args:
+ torque (np.ndarray): The torque to be published
+ status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise
+ joints_position (np.ndarray): The positions of the joints where the torques are applied
+ """
+ marker_array = MarkerArray()
+ for i, (t, joint) in enumerate(zip(torque, joints_position)):
+ marker = Marker()
+ marker.header.frame_id = "base_link"
+ marker.header.stamp = self.get_clock().now().to_msg()
+ marker.ns = "torque_visualization"
+ marker.id = i
+ marker.type = Marker.TEXT_VIEW_FACING
+ marker.text = f"{joint['name']}: {t:.2f} Nm"
+ marker.action = Marker.ADD
+ marker.pose.position.x = joint['x']
+ marker.pose.position.y = joint['y']
+ marker.pose.position.z = joint['z']
+ marker.pose.orientation.w = 1.0
+ marker.scale.x = 0.03
+ marker.scale.y = 0.03
+ marker.scale.z = 0.03
+ marker.color.a = 1.0 # Alpha
+ if status_torques[i]:
+ marker.color.r = 0.0 # Red
+ marker.color.g = 1.0 # Green
+ marker.color.b = 0.0 # Blue
+ else:
+ marker.color.r = 1.0 # Red
+ marker.color.g = 0.0 # Green
+ marker.color.b = 0.0 # Blue
+ marker_array.markers.append(marker)
+
+ self.publisher_rviz_torque.publish(marker_array)
+
+
+
+ def publish_payload_force(self, frame_id : str, external_force: np.ndarray):
+ """
+ Publish the gravity force on the frame with id `id_force`.
+
+ Args:
+ frame_id (str): The frame where the external force is applied.
+ external_force (np.ndarray): The external force to be published
+ """
+ id_force = self.robot.get_parent_joint_id(frame_id)
+
+ force_msg = WrenchStamped()
+ force_msg.header.stamp = self.get_clock().now().to_msg()
+ force_msg.header.frame_id = frame_id
+
+ force_msg.wrench.force.x = external_force[id_force].linear[0]
+ force_msg.wrench.force.y = external_force[id_force].linear[1]
+ force_msg.wrench.force.z = external_force[id_force].linear[2]
+ force_msg.wrench.torque.x = external_force[id_force].angular[0]
+ force_msg.wrench.torque.y = external_force[id_force].angular[1]
+ force_msg.wrench.torque.z = external_force[id_force].angular[2]
+
+ self.publisher_force.publish(force_msg)
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = RobotDescriptionSubscriber()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
new file mode 100644
index 0000000..fd2eb84
--- /dev/null
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
@@ -0,0 +1,233 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from rclpy.node import Node
+from std_msgs.msg import String
+from geometry_msgs.msg import WrenchStamped
+from sensor_msgs.msg import JointState
+from dynamic_payload_analysis_core.core import TorqueCalculator
+import numpy as np
+from visualization_msgs.msg import Marker, MarkerArray
+from dynamic_payload_analysis_ros.menu_visual import MenuPayload
+
+
+
+
+
+class RobotDescriptionSubscriber(Node):
+ def __init__(self):
+ super().__init__('node_robot_description_subscriber')
+ self.subscription = self.create_subscription(
+ String,
+ '/robot_description',
+ self.robot_description_callback,
+ qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1)
+ )
+
+ # menu for selecting frames to apply payload
+ self.menu = MenuPayload(self)
+
+ # Publisher for external force
+ self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10)
+
+ # Publisher for RViz visualization of torques
+ self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10)
+
+ # subscription to joint states
+ self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)
+
+ self.robot = None
+
+ # frame where the external force is applied
+ self.frame_id = None
+
+ self.external_force = None
+
+ self.get_logger().info('Robot description subscriber node started')
+
+
+ def robot_description_callback(self, msg):
+ self.get_logger().info('Received robot description')
+
+ self.robot = TorqueCalculator(robot_description = msg.data)
+ self.robot.print_active_joint()
+ self.robot.print_frames()
+
+ # Add the frame to the menu for payload selection
+ for frame in self.robot.get_active_frames():
+ self.menu.insert_frame(frame)
+
+ # self.robot.print_configuration()
+
+
+ def joint_states_callback(self, msg):
+ if self.robot is not None:
+ positions = list(msg.position)
+ name_position = list(msg.name)
+ v = msg.velocity if msg.velocity else self.robot.get_zero_velocity()
+
+ q = self.robot.set_position(positions, name_position)
+ a = self.robot.get_zero_acceleration()
+
+
+ # create the array with only the checked frames (with external force applied)
+ self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
+
+ # if there are no checked frames, set the external force to None
+ if len(self.checked_frames) != 0:
+ # create the array with the masses of the checked frames
+ self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
+ # create the external force with the masses and the checked frames
+ self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q)
+ else:
+ self.external_force = None
+
+ # compute the inverse dynamics
+ tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force)
+
+ # check the effort limits
+ status_efforts = self.robot.check_effort_limits(tau)
+ # print the torques
+ self.robot.print_torques(tau)
+
+ # get the active frames
+ frames = self.robot.get_active_frames()
+
+ # get the positions of the joints where the torques are applied
+ joints_position = self.robot.get_joints_placements(q)
+
+ # Publish the torques in RViz
+ self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position)
+
+ # publish the external force as arrows in RViz
+ self.publish_payload_force(self.menu.get_item_state())
+
+
+ def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray):
+ """
+ Publish the torque with labels on the robot in RViz.
+
+ Args:
+ torque (np.ndarray): The torque to be published
+ status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise
+ joints_position (np.ndarray): The positions of the joints where the torques are applied
+ """
+ marker_array = MarkerArray()
+ for i, (t, joint) in enumerate(zip(torque, joints_position)):
+ # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints)
+ if "gripper" not in joint['name']:
+ marker = Marker()
+ marker.header.frame_id = "base_link"
+ marker.header.stamp = self.get_clock().now().to_msg()
+ marker.ns = "torque_visualization"
+ marker.id = i
+ marker.type = Marker.TEXT_VIEW_FACING
+ # Set the text based on the joint type
+ if joint['type'] == 'JointModelPZ':
+ marker.text = f"{joint['name']}: {t:.2f} N"
+ else:
+ marker.text = f"{joint['name']}: {t:.2f} Nm"
+
+ marker.action = Marker.ADD
+ marker.pose.position.x = joint['x']
+ marker.pose.position.y = joint['y']
+ marker.pose.position.z = joint['z']
+ marker.pose.orientation.w = 1.0
+ marker.scale.x = 0.03
+ marker.scale.y = 0.03
+ marker.scale.z = 0.03
+ marker.color.a = 1.0 # Alpha
+ if status_torques[i]:
+ marker.color.r = 0.0 # Red
+ marker.color.g = 1.0 # Green
+ marker.color.b = 0.0 # Blue
+ else:
+ marker.color.r = 1.0 # Red
+ marker.color.g = 0.0 # Green
+ marker.color.b = 0.0 # Blue
+ marker_array.markers.append(marker)
+
+ self.publisher_rviz_torque.publish(marker_array)
+
+
+
+ def publish_payload_force(self, frames_names : np.ndarray[str]):
+ """
+ Publish the gravity force on the frame with id `id_force`.
+
+ Args:
+ frame_names : The frames where the external forces is applied.
+ """
+ external_force_array = MarkerArray()
+
+ for frame in frames_names:
+
+ id_force = self.robot.get_parent_joint_id(frame["name"])
+ joint_position = self.robot.get_joint_placement(id_force)
+ arrow_force = Marker()
+
+ arrow_force.header.frame_id = "base_link"
+ arrow_force.header.stamp = self.get_clock().now().to_msg()
+ arrow_force.ns = "external_force"
+ arrow_force.id = id_force
+ arrow_force.type = Marker.ARROW
+
+ # add the arrow if the frame is checked or delete it if not
+ if frame["checked"]:
+ arrow_force.action = Marker.ADD
+ else:
+ arrow_force.action = Marker.DELETE
+
+ arrow_force.scale.x = 0.20 # Length of the arrow
+ arrow_force.scale.y = 0.05 # Width of the arrow
+ arrow_force.scale.z = 0.05 # Height of the arrow
+ arrow_force.color.a = 1.0 # Alpha
+ arrow_force.color.r = 0.0
+ arrow_force.color.g = 0.0 # Green
+ arrow_force.color.b = 1.0
+
+ # Set the position of the arrow at the joint placement
+ arrow_force.pose.position.x = joint_position["x"]
+ arrow_force.pose.position.y = joint_position["y"]
+ arrow_force.pose.position.z = joint_position["z"]
+ # Set the direction of the arrow downwards
+ arrow_force.pose.orientation.x = 0.0
+ arrow_force.pose.orientation.y = 0.7071
+ arrow_force.pose.orientation.z = 0.0
+ arrow_force.pose.orientation.w = 0.7071
+
+ external_force_array.markers.append(arrow_force)
+
+ self.publisher_force.publish(external_force_array)
+
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = RobotDescriptionSubscriber()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml
new file mode 100644
index 0000000..f39adf4
--- /dev/null
+++ b/dynamic_payload_analysis_ros/package.xml
@@ -0,0 +1,18 @@
+
+
+
+ dynamic_payload_analysis_ros
+ 0.0.0
+ This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.
+ morolinux
+ Apache License 2.0
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros b/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros
new file mode 100644
index 0000000..e69de29
diff --git a/dynamic_payload_analysis_ros/setup.cfg b/dynamic_payload_analysis_ros/setup.cfg
new file mode 100644
index 0000000..5a64333
--- /dev/null
+++ b/dynamic_payload_analysis_ros/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/dynamic_payload_analysis_ros
+[install]
+install_scripts=$base/lib/dynamic_payload_analysis_ros
diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py
new file mode 100644
index 0000000..bb6057e
--- /dev/null
+++ b/dynamic_payload_analysis_ros/setup.py
@@ -0,0 +1,27 @@
+from setuptools import find_packages, setup
+
+package_name = 'dynamic_payload_analysis_ros'
+
+setup(
+ name=package_name,
+ version='0.0.0',
+ packages=find_packages(exclude=['test']),
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ ('share/' + package_name, ['package.xml']),
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='morolinux',
+ maintainer_email='enrimoro003@gmail.com',
+ description='This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.',
+ license='TODO: License declaration',
+ tests_require=['pytest'],
+ entry_points={
+ 'console_scripts': [
+ 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main',
+ 'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main',
+ ],
+ },
+)
diff --git a/dynamic_payload_analysis_ros/test/test_copyright.py b/dynamic_payload_analysis_ros/test/test_copyright.py
new file mode 100644
index 0000000..97a3919
--- /dev/null
+++ b/dynamic_payload_analysis_ros/test/test_copyright.py
@@ -0,0 +1,25 @@
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_copyright.main import main
+import pytest
+
+
+# Remove the `skip` decorator once the source file(s) have a copyright header
+@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
+@pytest.mark.copyright
+@pytest.mark.linter
+def test_copyright():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found errors'
diff --git a/dynamic_payload_analysis_ros/test/test_flake8.py b/dynamic_payload_analysis_ros/test/test_flake8.py
new file mode 100644
index 0000000..27ee107
--- /dev/null
+++ b/dynamic_payload_analysis_ros/test/test_flake8.py
@@ -0,0 +1,25 @@
+# Copyright 2017 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_flake8.main import main_with_errors
+import pytest
+
+
+@pytest.mark.flake8
+@pytest.mark.linter
+def test_flake8():
+ rc, errors = main_with_errors(argv=[])
+ assert rc == 0, \
+ 'Found %d code style errors / warnings:\n' % len(errors) + \
+ '\n'.join(errors)
diff --git a/dynamic_payload_analysis_ros/test/test_pep257.py b/dynamic_payload_analysis_ros/test/test_pep257.py
new file mode 100644
index 0000000..b234a38
--- /dev/null
+++ b/dynamic_payload_analysis_ros/test/test_pep257.py
@@ -0,0 +1,23 @@
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_pep257.main import main
+import pytest
+
+
+@pytest.mark.linter
+@pytest.mark.pep257
+def test_pep257():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found code style errors / warnings'