Skip to content

Commit

Permalink
Include test cases waypoint follwer (ros-navigation#3288)
Browse files Browse the repository at this point in the history
* WIP

* included missed_waypoing check

* finished inclding test

* fix format

* return default sleep value
  • Loading branch information
stevedanomodolor authored and Joshua Wallace committed Dec 14, 2022
1 parent a13d642 commit 95c951d
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions nav2_system_tests/src/waypoint_follower/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
from nav2_msgs.action import FollowWaypoints
from nav2_msgs.srv import ManageLifecycleNodes
from rcl_interfaces.srv import SetParameters

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
from rclpy.qos import QoSProfile

Expand All @@ -38,6 +40,7 @@ def __init__(self):
'initialpose', 10)
self.initial_pose_received = False
self.goal_handle = None
self.action_result = None

pose_qos = QoSProfile(
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
Expand All @@ -47,6 +50,8 @@ def __init__(self):

self.model_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
'amcl_pose', self.poseCallback, pose_qos)
self.param_cli = self.create_client(SetParameters,
'/waypoint_follower/set_parameters')

def setInitialPose(self, pose):
self.init_pose = PoseWithCovarianceStamped()
Expand All @@ -70,10 +75,10 @@ def setWaypoints(self, waypoints):
msg.pose.orientation.w = 1.0
self.waypoints.append(msg)

def run(self, block):
if not self.waypoints:
rclpy.error_msg('Did not set valid waypoints before running test!')
return False
def run(self, block, cancel):
# if not self.waypoints:
# rclpy.error_msg('Did not set valid waypoints before running test!')
# return False

while not self.action_client.wait_for_server(timeout_sec=1.0):
self.info_msg("'follow_waypoints' action server not available, waiting...")
Expand All @@ -98,12 +103,16 @@ def run(self, block):
return True

get_result_future = self.goal_handle.get_result_async()
if cancel:
time.sleep(2)
self.cancel_goal()

self.info_msg("Waiting for 'follow_waypoints' action to complete")
try:
rclpy.spin_until_future_complete(self, get_result_future)
status = get_result_future.result().status
result = get_result_future.result().result
self.action_result = result
except Exception as e: # noqa: B902
self.error_msg(f'Service call failed {e!r}')

Expand All @@ -121,6 +130,13 @@ def run(self, block):
def publishInitialPose(self):
self.initial_pose_pub.publish(self.init_pose)

def setStopFailureParam(self, value):
req = SetParameters.Request()
req.parameters = [Parameter('stop_on_failure',
Parameter.Type.BOOL, value).to_parameter_msg()]
future = self.param_cli.call_async(req)
rclpy.spin_until_future_complete(self, future)

def shutdown(self):
self.info_msg('Shutting down')

Expand Down Expand Up @@ -194,15 +210,15 @@ def main(argv=sys.argv[1:]):
test.info_msg('Waiting for amcl_pose to be received')
rclpy.spin_once(test, timeout_sec=1.0) # wait for poseCallback

result = test.run(True)
result = test.run(True, False)
assert result

# preempt with new point
test.setWaypoints([starting_pose])
result = test.run(False)
result = test.run(False, False)
time.sleep(2)
test.setWaypoints([wps[1]])
result = test.run(False)
result = test.run(False, False)

# cancel
time.sleep(2)
Expand All @@ -211,7 +227,30 @@ def main(argv=sys.argv[1:]):
# a failure case
time.sleep(2)
test.setWaypoints([[100.0, 100.0]])
result = test.run(True)
result = test.run(True, False)
assert not result
result = not result

# stop on failure test with bogous waypoint
test.setStopFailureParam(True)
bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
starting_pose = [-2.0, -0.5]
test.setWaypoints(bwps)
test.setInitialPose(starting_pose)
result = test.run(True, False)
assert not result
result = not result
mwps = test.action_result.missed_waypoints
result = (len(mwps) == 1) & (mwps[0] == 1)
test.setStopFailureParam(False)

# Zero goal test
test.setWaypoints([])
result = test.run(True, False)

# Cancel test
test.setWaypoints(wps)
result = test.run(True, True)
assert not result
result = not result

Expand Down

0 comments on commit 95c951d

Please sign in to comment.