-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #856 from tue-robotics/feature/guiding-state
Feature/guiding state
- Loading branch information
Showing
4 changed files
with
283 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
213 changes: 213 additions & 0 deletions
213
robot_smach_states/src/robot_smach_states/navigation/guidance.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
""" | ||
Module contains states to guide an operator to a designated location. | ||
""" | ||
|
||
# ROS | ||
import rospy | ||
import smach | ||
import PyKDL as kdl | ||
|
||
# Robot skills | ||
from robot_skills.util.kdl_conversions import VectorStamped | ||
from robot_smach_states.util.designators import EdEntityDesignator | ||
|
||
# robot_smach_states.navigation | ||
import navigation | ||
from robot_smach_states.navigation.navigate_to_symbolic import NavigateToSymbolic | ||
|
||
|
||
def _detect_operator_behind_robot(robot, distance=1.0, radius=0.5): | ||
# type: (Robot, float, float) -> bool | ||
""" | ||
Checks if a person is within <radius> of the position <distance> behind the <robot> | ||
:param robot: (Robot) api object | ||
:param distance: (float) follow distance | ||
:param radius: (float) radius around the position at <distance> behind the robot | ||
:return: (bool) whether an operator was detected | ||
""" | ||
image_data = robot.perception.get_rgb_depth_caminfo() | ||
success, found_people_ids = robot.ed.detect_people(*image_data) | ||
found_people = [robot.ed.get_entity(id_) for id_ in found_people_ids] | ||
|
||
# Assume the operator is around 1.0 m behind the robot | ||
base_pose = robot.base.get_location() | ||
expected_person_pos = base_pose.frame * kdl.Vector(-distance, 0.0, 0.0) | ||
|
||
for person in found_people: | ||
if (person.pose.frame.p - expected_person_pos).Norm() < radius: | ||
return True | ||
return False | ||
|
||
|
||
class ExecutePlanGuidance(smach.State): | ||
""" | ||
Similar to the "executePlan" smach state. The only difference is that after driving for x meters, "check for | ||
operator" is returned. | ||
""" | ||
def __init__(self, robot): | ||
# type: (Robot) -> None | ||
smach.State.__init__(self, outcomes=["arrived", "blocked", "preempted", "lost_operator"]) | ||
self.robot = robot | ||
self._distance_threshold = 1.0 # Only check if the operator is there once we've drived for this distance | ||
# self._follow_distance = 1.0 # Operator is expected to follow the robot approximately this distance | ||
# self._operator_radius_threshold = 0.5 # Operator is expected to be within this radius around the position | ||
# defined by the follow distance | ||
|
||
def execute(self, userdata=None): | ||
|
||
# Look backwards to have the operator in view | ||
self.robot.head.look_at_point(VectorStamped(-1.0, 0.0, 1.75, self.robot.base_link_frame)) | ||
|
||
rate = rospy.Rate(10.0) # Loop at 10 Hz | ||
distance = 0.0 | ||
old_position = self._get_base_position() | ||
operator_stamp = rospy.Time.now() # Assume the operator is near if we start here | ||
while not rospy.is_shutdown(): | ||
|
||
if self.preempt_requested(): | ||
self.robot.base.local_planner.cancelCurrentPlan() | ||
rospy.loginfo("execute: preempt_requested") | ||
return "preempted" | ||
|
||
status = self.robot.base.local_planner.getStatus() | ||
|
||
if status == "arrived": | ||
return "arrived" | ||
elif status == "blocked": | ||
return "blocked" | ||
|
||
new_position = self._get_base_position() | ||
distance += (new_position - old_position).Norm() | ||
old_position = new_position | ||
if distance > self._distance_threshold: | ||
rospy.logdebug( | ||
"Distance {} exceeds threshold {}, check for operator".format(distance, self._distance_threshold)) | ||
if not self._check_operator(): | ||
rospy.loginfo("Lost operator while guiding, cancelling plan") | ||
self.robot.base.local_planner.cancelCurrentPlan() | ||
return "lost_operator" | ||
|
||
rate.sleep() | ||
|
||
def _check_operator(self): | ||
""" | ||
Checks if the operator is still sufficiently close | ||
:return: (bool) | ||
""" | ||
# ToDo: make robust (use time stamp?) | ||
return _detect_operator_behind_robot(self.robot) # , self._follow_distance, self._operator_radius_threshold) | ||
|
||
def _get_base_position(self): | ||
# type: () -> kdl.Vector | ||
""" | ||
Gets the base position as a kdl Vector | ||
:return: (kdl Vector) with current base position | ||
""" | ||
frame_stamped = self.robot.base.get_location() | ||
return frame_stamped.frame.p | ||
|
||
|
||
class WaitForOperator(smach.State): | ||
def __init__(self, robot, timeout=10.0): | ||
# type: (Robot, float) -> None | ||
""" | ||
Smach state to check if the operator is still following the robot. | ||
:param robot: (Robot) robot api object | ||
:param timeout: (float) if the operator has not been detected for this period, "is_lost" will be returned | ||
""" | ||
smach.State.__init__(self, outcomes=["is_following", "is_lost", "preempted"]) | ||
self._robot = robot | ||
self._timeout = timeout | ||
|
||
def execute(self, ud): | ||
|
||
self._robot.speech.speak("It seems that we have lost each other. Please stand one meter behind me.") | ||
|
||
rate = rospy.Rate(2.0) | ||
t_start = rospy.Time.now() | ||
while not rospy.is_shutdown(): | ||
|
||
# Check if the operator is there | ||
if _detect_operator_behind_robot(self._robot): | ||
self._robot.speech.speak("There you are", block=False) | ||
return "is_following" | ||
|
||
# Check timeout | ||
if (rospy.Time.now() - t_start).to_sec() > self._timeout: | ||
rospy.loginfo("Guide - Wait for operator: timeout {} exceeded".format(self._timeout)) | ||
return "is_lost" | ||
|
||
rate.sleep() | ||
|
||
return "preempted" | ||
|
||
|
||
class Guide(smach.StateMachine): | ||
def __init__(self, robot): | ||
# type: (robot) -> None | ||
""" | ||
Base Smach state to guide an operator to a designated position | ||
:param robot: (Robot) robot api object | ||
""" | ||
smach.StateMachine.__init__( | ||
self, outcomes=["arrived", "unreachable", "goal_not_defined", "lost_operator", "preempted"]) | ||
self.robot = robot | ||
|
||
with self: | ||
smach.StateMachine.add("GET_PLAN", navigation.getPlan(self.robot, self.generate_constraint), | ||
transitions={"unreachable": "unreachable", | ||
"goal_not_defined": "goal_not_defined", | ||
"goal_ok": "EXECUTE_PLAN"}) | ||
|
||
smach.StateMachine.add("EXECUTE_PLAN", ExecutePlanGuidance(self.robot), | ||
transitions={"arrived": "arrived", | ||
"blocked": "PLAN_BLOCKED", | ||
"preempted": "preempted", | ||
"lost_operator": "WAIT_FOR_OPERATOR"}) | ||
|
||
smach.StateMachine.add("WAIT_FOR_OPERATOR", WaitForOperator(self.robot), | ||
transitions={"is_following": "GET_PLAN", | ||
"is_lost": "lost_operator"}) | ||
|
||
smach.StateMachine.add("PLAN_BLOCKED", navigation.planBlocked(self.robot), | ||
transitions={"blocked": "GET_PLAN", | ||
"free": "EXECUTE_PLAN"}) | ||
|
||
@staticmethod | ||
def generate_constraint(): | ||
raise NotImplementedError("Inheriting Guide states must implement a generate constraint method, preferably" | ||
"by re-using it from a navigation state.") | ||
|
||
|
||
class GuideToSymbolic(Guide): | ||
""" Guidance class to navigate to a semantically annotated goal, e.g., in front of the dinner table. | ||
""" | ||
def __init__(self, robot, entity_designator_area_name_map, entity_lookat_designator): | ||
# type: (Robot, dict, EdEntityDesignator) -> None | ||
""" Constructor | ||
:param robot: robot object | ||
:param entity_designator_area_name_map: dictionary mapping EdEntityDesignators to a string or designator | ||
resolving to a string, representing the area, e.g., entity_designator_area_name_map[<EdEntity>] = 'in_front_of'. | ||
:param entity_lookat_designator: EdEntityDesignator defining the entity the robot should look at. This is used | ||
to compute the orientation constraint. | ||
""" | ||
super(GuideToSymbolic, self).__init__(robot) | ||
self._entity_designator_area_name_map = entity_designator_area_name_map | ||
self._entity_lookat_designator = entity_lookat_designator | ||
|
||
def generate_constraint(self): | ||
# type: () -> tuple | ||
""" | ||
Generates the constraint using the generate constraint method of NavigateToSymbolic | ||
:return: (tuple(PositionConstraint, OrientationConstraint)). If one of the entities does not resolve, | ||
None is returned. | ||
""" | ||
return NavigateToSymbolic.generate_constraint( | ||
self.robot, self._entity_designator_area_name_map, self._entity_lookat_designator) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import os | ||
import rospy | ||
import sys | ||
import std_srvs.srv | ||
import robot_smach_states.util.designators as ds | ||
|
||
from robot_skills.get_robot import get_robot_from_argv | ||
from robot_smach_states.navigation import guidance | ||
|
||
OPERATOR_AVAILABLE = True | ||
|
||
|
||
def toggle_operator(_): | ||
""" | ||
Toggles if the operator is following | ||
:param _: | ||
:return: | ||
""" | ||
global OPERATOR_AVAILABLE | ||
OPERATOR_AVAILABLE = not OPERATOR_AVAILABLE | ||
rospy.loginfo("Operator available: {}".format(OPERATOR_AVAILABLE)) | ||
return std_srvs.srv.EmptyResponse() | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
assert len(sys.argv) == 3, "Please provide the robot name and the entity id of the object to guide to," \ | ||
"e.g., 'python guidance.py amigo bed'" | ||
|
||
# Create node, robot and toggle interface | ||
rospy.init_node("test_guidance") | ||
r = get_robot_from_argv(1) | ||
e_id = sys.argv[2] | ||
|
||
# Instantiate GuideToSymbolic machine | ||
s = guidance.GuideToSymbolic(r, | ||
{ds.EntityByIdDesignator(r, id=e_id): "in_front_of"}, | ||
ds.EntityByIdDesignator(r, id=e_id) | ||
) | ||
|
||
# In simulation, mock the _check_operator method | ||
if os.getenv("ROBOT_REAL", "false").lower() != "true": | ||
# noinspection PyUnusedLocal | ||
def _mock_detect_operator(robot, distance=1.0, radius=0.5): | ||
return OPERATOR_AVAILABLE | ||
guidance._detect_operator_behind_robot = _mock_detect_operator | ||
rospy.Service("toggle_operator", std_srvs.srv.Empty, toggle_operator) | ||
rospy.loginfo('\n\nCall:\nrosservice call /toggle_operator "{}"\n\nto toggle operator availability\n'.format('{}')) | ||
|
||
# Execute the state | ||
s.execute() |