-
Notifications
You must be signed in to change notification settings - Fork 326
/
test_executables_tutorial.py.in
69 lines (57 loc) · 2.6 KB
/
test_executables_tutorial.py.in
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# generated from demo_nodes_cpp/test/test_executables_tutorial.py.in
# generated code does not contain a copyright notice
import os
import unittest
from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction
import launch_testing
import launch_testing.asserts
import launch_testing.util
import launch_testing_ros
def generate_test_description(ready_fn):
os.environ['OSPL_VERBOSITY'] = '8' # 8 = OS_NONE
# bare minimum formatting for console output matching
os.environ['RCUTILS_CONSOLE_OUTPUT_FORMAT'] = '{message}'
launch_description = LaunchDescription()
processes_under_test = [
ExecuteProcess(cmd=[executable], name='test_executable_' + str(i), output='screen')
for i, executable in enumerate('@DEMO_NODES_CPP_EXECUTABLE@'.split(';'))
]
for process in processes_under_test:
launch_description.add_action(process)
launch_description.add_action(launch_testing.util.KeepAliveProc())
launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)
return launch_description, locals()
class TestExecutablesTutorial(unittest.TestCase):
def test_processes_output(self, proc_output, processes_under_test):
"""Test all processes output against expectations."""
from launch_testing.tools.output import get_default_filtered_prefixes
output_filter = launch_testing_ros.tools.basic_output_filter(
filtered_prefixes=get_default_filtered_prefixes() + [
'service not available, waiting again...'
],
filtered_rmw_implementation='@rmw_implementation@'
)
output_files = '@DEMO_NODES_CPP_EXPECTED_OUTPUT@'.split(';')
for process, output_file in zip(processes_under_test, output_files):
proc_output.assertWaitFor(
expected_output=launch_testing.tools.expected_output_from_file(
path=output_file
), process=process, output_filter=output_filter, timeout=30
)
# TODO(hidmic): either make the underlying executables resilient to
# interruptions close/during shutdown OR adapt the testing suite to
# better cope with it.
import time
time.sleep(5)
@launch_testing.post_shutdown_test()
class TestExecutablesTutorialAfterShutdown(unittest.TestCase):
def test_last_process_exit_code(self, proc_info, processes_under_test):
"""Test last process exit code."""
launch_testing.asserts.assertExitCodes(
proc_info,
process=processes_under_test[-1]
)