-
Notifications
You must be signed in to change notification settings - Fork 153
/
direct.py
99 lines (75 loc) · 3.42 KB
/
direct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import rclpy
import rclpy.action
from rclpy.parameter import Parameter
from ros2cli.node import NODE_NAME_PREFIX
DEFAULT_TIMEOUT = 0.5
class DirectNode:
def __init__(self, args, *, node_name=None):
timeout_reached = False
def timer_callback():
nonlocal timeout_reached
timeout_reached = True
argv = getattr(args, 'argv', [])
rclpy.init(args=argv)
node_name_suffix = getattr(
args, 'node_name_suffix', '_%d' % os.getpid())
start_parameter_services = getattr(
args, 'start_parameter_services', False)
use_sim_time = getattr(args, 'use_sim_time', False)
start_type_description_service = getattr(args, 'start_type_description_service', True)
if node_name is None:
node_name = NODE_NAME_PREFIX + node_name_suffix
self.node = rclpy.create_node(
node_name,
start_parameter_services=start_parameter_services,
parameter_overrides=[
Parameter('use_sim_time', value=use_sim_time),
Parameter('start_type_description_service', value=start_type_description_service),
], automatically_declare_parameters_from_overrides=True)
timeout = getattr(args, 'spin_time', DEFAULT_TIMEOUT)
timer = self.node.create_timer(timeout, timer_callback)
while not timeout_reached:
rclpy.spin_once(self.node)
self.node.destroy_timer(timer)
def __enter__(self):
return self
# TODO(hidmic): generalize/standardize rclpy graph API
# to not have to make a special case for
# rclpy.action
def get_action_names_and_types(self):
return rclpy.action.get_action_names_and_types(self.node)
def get_action_client_names_and_types_by_node(self, remote_node_name, remote_node_namespace):
return rclpy.action.get_action_client_names_and_types_by_node(
self.node, remote_node_name, remote_node_namespace)
def get_action_server_names_and_types_by_node(self, remote_node_name, remote_node_namespace):
return rclpy.action.get_action_server_names_and_types_by_node(
self.node, remote_node_name, remote_node_namespace)
def __getattr__(self, name):
if not rclpy.ok():
raise RuntimeError('!rclpy.ok()')
return getattr(self.node, name)
def __exit__(self, exc_type, exc_value, traceback):
self.node.destroy_node()
rclpy.try_shutdown()
def add_arguments(parser):
parser.add_argument(
'--spin-time', type=float, default=DEFAULT_TIMEOUT,
help='Spin time in seconds to wait for discovery (only applies when '
'not using an already running daemon)')
parser.add_argument(
'-s', '--use-sim-time', action='store_true',
help='Enable ROS simulation time')