Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ def generate_test_description():
arguments=[str(path_to_test / 'executables' / 'talker.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
output='screen',
),
])


@pytest.mark.launch
@pytest.mark.launch(fixture=generate_test_description)
def test_check_if_msgs_published():
rclpy.init()
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@
from std_msgs.msg import String


class Listener(Node):
class Talker(Node):

def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String, 'chatter', self.callback, 10
)
super().__init__('talker')
self.count = 0
self.publisher = self.create_publisher(String, 'chatter', 10)
self.timer = self.create_timer(1.0, self.callback)

def callback(self, msg):
self.get_logger().info('I heard: [%s]' % msg.data)
def callback(self):
data = 'Hello World: {0}'.format(self.count)
self.get_logger().info('Publishing: "{0}"'.format(data))
self.publisher.publish(String(data=data))
self.count += 1


def main(args=None):
rclpy.init(args=args)

node = Listener()
node = Talker()

try:
rclpy.spin(node)
except KeyboardInterrupt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def test_read_stdout(hello_world_proc, launch_context):
def validate_output(output):
# this function can use assertions to validate the output or return a boolean.
# pytest generates easier to understand failures when assertions are used.
assert output == 'hello_world\n', 'process never printed hello_world'
assert output.splitlines() == ['hello_world'], 'process never printed hello_world'
return True
assert process_tools.wait_for_output_sync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious question, why was this assert removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function always returns None in this case, though I think that's confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See a9cc81c

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function always returns None in this case, though I think that's confusing.

It was not only confusing, it was also blocking unnecessarely untile the timeout expired.

Copy link
Contributor

@hidmic hidmic Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanpauno meta: we clearly need a less terse idiom to write predicates. If we were to assume the predicate passed if it returns None, that'd be practical yet surprising. If we were to simply return the result of evaluating the expression, we'd lose detailed assertion message (most of which are provided pytest rewriting logic). So how about splitting use cases? E.g.:

# Wait for incoming process output with a timeout
output = wait_for_output_sync(process, timeout=10)
output = await wait_for_output(process, timeout=10)

# Wait for incoming process output matching a predicate with a timeout
def predicate(output):
    return 'foo' in output
output = wait_for_output_sync(process, predicate, timeout=10)
output = await wait_for_output(process, predicate, timeout=10)

# Assert a statement on process output with a timeout 
def statement(output):
    assert output == 'foo'
assert_output_sync(process, statement, timeout=10)
await assert_output(process, statement, timeout=10)

In this case, it would read:

def hello_world_found():
     assert output.splitlines() == ['hello_world'], 'process never printed hello_world'
assert_output_sync(launch_context, hello_world_proc, hello_world_found, timeout=5)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like the right approach to me.
I will address it in a follow-up PR so we solve the CI issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, merge!

launch_context, hello_world_proc, validate_output, timeout=5)

Expand Down
17 changes: 17 additions & 0 deletions launch_pytest/test/launch_pytest/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import shutil


def test_launch_fixture_is_not_a_launch_description(testdir):
testdir.makepyfile("""\
Expand Down Expand Up @@ -313,3 +316,17 @@ async def test_case():
result = testdir.runpytest()

result.assert_outcomes(failed=1, skipped=1)


def test_examples(testdir):
examples_dir = Path(__file__).parent / 'examples'
for example in examples_dir.iterdir():
# Ignoring `check_node_msgs.py` because we cannot depend on launch_ros here
# as it creates a circular dependency between repositories.
# We need to move that example elsewhere.
if example.is_file() and example.name != 'check_node_msgs.py':
copied_example = Path(testdir.copy_example(example))
copied_example.rename(copied_example.parent / f'test_{copied_example.name}')
shutil.copytree(examples_dir / 'executables', Path(str(testdir.tmpdir)) / 'executables')
result = testdir.runpytest()
result.assert_outcomes(passed=22)
32 changes: 21 additions & 11 deletions launch_pytest/test/launch_pytest/tools/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,35 @@ def launch_description(dut):

@pytest.mark.launch(fixture=launch_description)
async def test_async_process_tools(dut, launch_context):
await tools.wait_for_start(launch_context, dut, timeout=10)
def check_output(output): assert output.splitlines() == ['hello']
await tools.wait_for_output(
assert await tools.wait_for_start(launch_context, dut, timeout=10)

def check_output(output):
assert output.splitlines() == ['hello']
return True
assert await tools.wait_for_output(
launch_context, dut, check_output, timeout=10)

def check_stderr(err): assert err.splitlines() == ['world']
await tools.wait_for_stderr(
def check_stderr(err):
assert err.splitlines() == ['world']
return True
assert await tools.wait_for_stderr(
launch_context, dut, check_stderr, timeout=10)
await tools.wait_for_exit(launch_context, dut, timeout=10)
assert await tools.wait_for_exit(launch_context, dut, timeout=10)


@pytest.mark.launch(fixture=launch_description)
def test_sync_process_tools(dut, launch_context):
tools.wait_for_start_sync(launch_context, dut, timeout=10)
def check_output(output): assert output.splitlines() == ['hello']
tools.wait_for_output_sync(

def check_output(output):
assert output.splitlines() == ['hello']
return True
assert tools.wait_for_output_sync(
launch_context, dut, check_output, timeout=10)

def check_stderr(err): assert err.splitlines() == ['world']
tools.wait_for_stderr_sync(
def check_stderr(err):
assert err.splitlines() == ['world']
return True
assert tools.wait_for_stderr_sync(
launch_context, dut, check_stderr, timeout=10)
tools.wait_for_exit_sync(launch_context, dut, timeout=10)
assert tools.wait_for_exit_sync(launch_context, dut, timeout=10)