In [None]:
import subprocess
from geometry_msgs.msg import PoseStamped
from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
import rclpy
from rclpy.duration import Duration

# Launch the Gazebo Simulation
Note that you have 5 seconds to press play when the simulation pops up and you then have to change the namespace to **/namespace**/cmd_vel.

In this case: */a200_0000/cmd_vel*

Ensure that you have selected the correct python interpreter, the one with all the docker dependencies installed. Type `which python3` in terminator to double check the path. Then on VSCode press _(Ctrl+Shift+P)_, type _Select Interpreter_ and type in the correct path.

In [None]:
# Launch the simulation from the clearpath_gz package
launch_sim = "ros2 launch clearpath_gz simulation.launch.py"

# Execute the command
process = subprocess.Popen(launch_sim, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Print the output in real time
for line in process.stdout:
    print(line.decode('utf-8'), end="")

# Wait for the process to complete
process.wait()

# Check for errors
if process.returncode != 0:
    error_output = process.stderr.read().decode('utf-8')
    print(f"Error: {error_output}")

[INFO] [launch]: All log files can be found below /home/marita/.ros/log/2024-12-02-14-34-28-056174-marita-ubuntu-15589
[INFO] [launch]: Default logging verbosity is set to INFO
[INFO] [ruby $(which ign) gazebo-1]: process started with pid [15590]
[INFO] [parameter_bridge-2]: process started with pid [15592]
[INFO] [generate_description-3]: process started with pid [15595]
[generate_description-3] Generated /home/marita/clearpath/robot.urdf.xacro
[parameter_bridge-2] [INFO] [1733150068.347250435] [clock_bridge]: Creating GZ->ROS Bridge: [/clock (ignition.msgs.Clock) -> /clock (rosgraph_msgs/msg/Clock)] (Lazy 0)
[INFO] [generate_description-3]: process has finished cleanly [pid 15595]
[INFO] [generate_semantic_description-4]: process started with pid [15613]
[generate_semantic_description-4] Generated /home/marita/clearpath/robot.srdf.xacro
[ruby $(which ign) gazebo-1] QStandardPaths: XDG_RUNTIME_DIR not set, defaulting to '/tmp/runtime-marita'
[ruby $(which ign) gazebo-1] [1;33m[GUI] [

# Initialise the NAV2 Simple Commander API

The Robot Navigator Class which defines the API is defined [here](https://github.com/ros-navigation/navigation2/blob/main/nav2_simple_commander/nav2_simple_commander/robot_navigator.py) and is a great example of OOP

In [None]:
rclpy.init()
navigator = BasicNavigator()

# Simple Husky Operations
By using the basic operations: go forward, go back, and spin, the Husky can perform any maneuver.

In [None]:
# Forward

navigator.driveOnHeading(dist=0.15, speed=0.025, time_allowance=10)
i = 0
while not navigator.isTaskComplete():
    i += 1
    feedback = navigator.getFeedback()
    if feedback and i % 5 == 0:
        print(f'Distance traveled: {feedback.angular_distance_traveled}')

In [None]:
# Back Up 

navigator.backup(backup_dist=0.5, backup_speed=0.1)

i = 0
while not navigator.isTaskComplete():
    i += 1
    feedback = navigator.getFeedback()
    if feedback and i % 5 == 0:
        print(f'Distance traveled: {feedback.distance_traveled}')

In [None]:
# Spin around Z-axis

navigator.spin(spin_dist=3.14) # spin dist in radians (3.14 = pi = 180 degrees)

i = 0
while not navigator.isTaskComplete():
    i += 1
    feedback = navigator.getFeedback()
    if feedback and i % 5 == 0:
        print(f'Spin angle traveled: {feedback.angular_distance_traveled}')

# Localisation
Set initial pose

In [None]:
start_point_x = 3.45
start_point_y = 2.15
start_orientation_z = 1.0

In [None]:
initial_pose = PoseStamped()
initial_pose.header.frame_id = 'map'
initial_pose.header.stamp = navigator.get_clock().now().to_msg()
initial_pose.pose.position.x = start_point_x
initial_pose.pose.position.y = start_point_y
initial_pose.pose.orientation.z = start_orientation_z
initial_pose.pose.orientation.w = 0.0

navigator.setInitialPose(initial_pose)

navigator.waitUntilNav2Active()

# Managing the Map
After initialising your pose, you can also also change your map and/or clear or obtain the map's costmaps

In [None]:
# change map
# navigator.changeMap('/path/to/map.yaml')

# Clear cost maps
# navigator.clearAllCostmaps() # OR
# clearLocalCostmap() # OR
# clearGlobalCostmap()

# Obtain costmaps
# global_costmap = navigator.getGlobalCostmap()
# local_costmap = navigator.getLocalCostmap()


# Navigate to Goal Pose

In [None]:
# First define the goal pose
goal_pose = PoseStamped()
goal_pose.header.frame_id = 'map'
goal_pose.header.stamp = navigator.get_clock().now().to_msg()
goal_pose.pose.position.x = 17.86
goal_pose.pose.position.y = -0.77
goal_pose.pose.orientation.w = 1.0
goal_pose.pose.orientation.z = 0.0

# Ensure that goal pose is accessible
path = navigator.getPath(initial_pose, goal_pose)

In [None]:
# Navigate to goal pose from initial pose
navigator.goToPose(goal_pose)

i = 0
while not navigator.isTaskComplete():
    i = i + 1
    feedback = navigator.getFeedback()
    if feedback and i % 5 == 0:
        print(
            'Estimated time of arrival: '
            + '{0:.0f}'.format(
                Duration.from_msg(feedback.estimated_time_remaining).nanoseconds
                / 1e9
            )
            + ' seconds.'
        )

        # Time out for cancelation
        if Duration.from_msg(feedback.navigation_time) > Duration(seconds=600.0):
            navigator.cancelTask()

        # Reset if too long
        if Duration.from_msg(feedback.navigation_time) > Duration(seconds=18.0):
            goal_pose.pose.position.x = 0.0
            goal_pose.pose.position.y = 0.0
            navigator.goToPose(goal_pose)

# Get status
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
    print('Goal succeeded!')
elif result == TaskResult.CANCELED:
    print('Goal was canceled!')
elif result == TaskResult.FAILED:
    print('Goal failed!')
else:
    print('Goal has an invalid return status!')

navigator.lifecycleShutdown()

exit(0)

# Go Through Multiple Poses

In [None]:
# Initialise a list to append all poses you want to visit
goal_poses = []

In [None]:
# Define all poses you want to visit
goal_pose1 = PoseStamped()
goal_pose1.header.frame_id = 'map'
goal_pose1.header.stamp = navigator.get_clock().now().to_msg()
goal_pose1.pose.position.x = 10.15
goal_pose1.pose.position.y = -0.77
goal_pose1.pose.orientation.w = 1.0
goal_pose1.pose.orientation.z = 0.0


goal_pose2 = PoseStamped()
goal_pose2.header.frame_id = 'map'
goal_pose2.header.stamp = navigator.get_clock().now().to_msg()
goal_pose2.pose.position.x = 17.86
goal_pose2.pose.position.y = -0.77
goal_pose2.pose.orientation.w = 1.0
goal_pose2.pose.orientation.z = 0.0

# ...
# ...

In [None]:
# Append all goal poses to your list

goal_poses.append(goal_pose1)
goal_poses.append(goal_pose2)
# ...

# Check if the path is feasible
path = navigator.getPathThroughPoses(initial_pose, goal_poses)

In [None]:
# Execute the path and visit all goal poses
navigator.goThroughPoses(goal_poses)

i = 0
while not navigator.isTaskComplete():
    i = i + 1
    feedback = navigator.getFeedback()
    if feedback and i % 5 == 0:
        print(
            'Estimated time of arrival: '
            + '{0:.0f}'.format(
                Duration.from_msg(feedback.estimated_time_remaining).nanoseconds
                / 1e9
            )
            + ' seconds.'
        )

        # Some navigation timeout to demo cancellation
        if Duration.from_msg(feedback.navigation_time) > Duration(seconds=600.0):
            navigator.cancelTask()

        # Some navigation request change to demo preemption
        if Duration.from_msg(feedback.navigation_time) > Duration(seconds=35.0):
            goal_pose4 = PoseStamped()
            goal_pose4.header.frame_id = 'map'
            goal_pose4.header.stamp = navigator.get_clock().now().to_msg()
            goal_pose4.pose.position.x = 0.0
            goal_pose4.pose.position.y = 0.0
            goal_pose4.pose.orientation.w = 1.0
            goal_pose4.pose.orientation.z = 0.0
            navigator.goThroughPoses([goal_pose4])

# Do something depending on the return code
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
    print('Goal succeeded!')
elif result == TaskResult.CANCELED:
    print('Goal was canceled!')
elif result == TaskResult.FAILED:
    print('Goal failed!')
else:
    print('Goal has an invalid return status!')

navigator.lifecycleShutdown()

exit(0)