diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 474be273d7..b6f195d055 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -48,6 +48,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--qos-profile-overrides-path', type=FileType('r'), help='Path to a yaml file defining overrides of the QoS profile for specific topics.') + parser.add_argument( + '-l', '--loop', action='store_true', + help='enables loop playback when playing a bagfile: it starts back at the beginning ' + 'on reaching the end and plays indefinitely.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -72,4 +76,5 @@ def main(self, *, args): # noqa: D102 read_ahead_queue_size=args.read_ahead_queue_size, rate=args.rate, topics=args.topics, - qos_profile_overrides=qos_profile_overrides) + qos_profile_overrides=qos_profile_overrides, + loop=args.loop) diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 6c0c6a86cf..762ed7ebbb 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -160,6 +160,13 @@ function(create_tests_for_rmw_implementation) ${SKIP_TEST} LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) + + rosbag2_transport_add_gmock(test_play_loop + test/rosbag2_transport/test_play_loop.cpp + ${SKIP_TEST} + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + endfunction() if(BUILD_TESTING) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 2ed6e1c0ee..f2d6f0f093 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -38,6 +38,7 @@ struct PlayOptions std::vector topics_to_filter = {}; std::unordered_map topic_qos_profile_overrides = {}; + bool loop = false; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp index 063e22d020..7f6bd512ec 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp @@ -1,4 +1,5 @@ // Copyright 2018, Bosch Software Innovations GmbH. +// Copyright 2020, TNG Technology Consulting GmbH. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -93,12 +94,13 @@ void Rosbag2Transport::play( const StorageOptions & storage_options, const PlayOptions & play_options) { try { - reader_->open(storage_options, {"", rmw_get_serialization_format()}); - auto transport_node = setup_node(play_options.node_prefix); - Player player(reader_, transport_node); - player.play(play_options); + + do { + reader_->open(storage_options, {"", rmw_get_serialization_format()}); + player.play(play_options); + } while (rclcpp::ok() && play_options.loop); } catch (std::runtime_error & e) { ROSBAG2_TRANSPORT_LOG_ERROR("Failed to play: %s", e.what()); } diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index b14dfc9135..f724abc44e 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -1,4 +1,5 @@ // Copyright 2018 Open Source Robotics Foundation, Inc. +// Copyright 2020, TNG Technology Consulting GmbH. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -210,6 +211,7 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k "rate", "topics", "qos_profile_overrides", + "loop", nullptr }; @@ -220,15 +222,18 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k float rate; PyObject * topics = nullptr; PyObject * qos_profile_overrides{nullptr}; + bool loop = false; + if (!PyArg_ParseTupleAndKeywords( - args, kwargs, "sss|kfOO", const_cast(kwlist), + args, kwargs, "sss|kfOOb", const_cast(kwlist), &uri, &storage_id, &node_prefix, &read_ahead_queue_size, &rate, &topics, - &qos_profile_overrides)) + &qos_profile_overrides, + &loop)) { return nullptr; } @@ -239,6 +244,7 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k play_options.node_prefix = std::string(node_prefix); play_options.read_ahead_queue_size = read_ahead_queue_size; play_options.rate = rate; + play_options.loop = loop; if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 014193ba76..9b1d81a9c6 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -31,6 +31,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn { (void) storage_options; (void) converter_options; + num_read_ = 0; } void reset() override {} diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp new file mode 100644 index 0000000000..b33d3f2267 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp @@ -0,0 +1,40 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TRANSPORT__ROSBAG2_PLAY_TEST_FIXTURE_HPP_ +#define ROSBAG2_TRANSPORT__ROSBAG2_PLAY_TEST_FIXTURE_HPP_ + +#include + +#include "rosbag2_transport_test_fixture.hpp" + +class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture +{ +public: + RosBag2PlayTestFixture() + : Rosbag2TransportTestFixture() + { + rclcpp::init(0, nullptr); + sub_ = std::make_shared(); + } + + ~RosBag2PlayTestFixture() override + { + rclcpp::shutdown(); + } + + std::shared_ptr sub_; +}; + +#endif // ROSBAG2_TRANSPORT__ROSBAG2_PLAY_TEST_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 6764225413..a2c6599488 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -32,31 +32,13 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" -#include "rosbag2_transport_test_fixture.hpp" +#include "rosbag2_play_test_fixture.hpp" using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT using namespace std::chrono_literals; // NOLINT using namespace rosbag2_test_common; // NOLINT -class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture -{ -public: - RosBag2PlayTestFixture() - : Rosbag2TransportTestFixture() - { - rclcpp::init(0, nullptr); - sub_ = std::make_shared(); - } - - ~RosBag2PlayTestFixture() override - { - rclcpp::shutdown(); - } - - std::shared_ptr sub_; -}; - TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { auto primitive_message1 = get_messages_basic_types()[0]; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp new file mode 100644 index 0000000000..68797e6970 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -0,0 +1,86 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// Copyright 2020, TNG Technology Consulting GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/rosbag2_transport.hpp" + +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" + +TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { + // test constants + const int test_value = 42; + const size_t num_messages = 3; + const int64_t time_stamp_in_milliseconds = 700; + const size_t expected_number_of_messages = num_messages * 2; + const size_t read_ahead_queue_size = 1000; + const float rate = 1.0; + const bool loop_playback = true; + + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = test_value; + + auto topic_types = std::vector{ + {"loop_test_topic", "test_msgs/BasicTypes", "", ""} + }; + + std::vector> messages(num_messages, + serialize_test_message("loop_test_topic", time_stamp_in_milliseconds, primitive_message1)); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + "/loop_test_topic", + expected_number_of_messages); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto rosbag2_transport_ptr = std::make_shared( + reader_, + writer_, + info_); + std::thread loop_thread(&rosbag2_transport::Rosbag2Transport::play, rosbag2_transport_ptr, + storage_options_, + rosbag2_transport::PlayOptions{read_ahead_queue_size, "", rate, {}, {}, loop_playback}); + + await_received_messages.get(); + rclcpp::shutdown(); + loop_thread.join(); + + auto replayed_test_primitives = sub_->get_received_messages( + "/loop_test_topic"); + + EXPECT_THAT(replayed_test_primitives.size(), Ge(expected_number_of_messages)); + EXPECT_THAT( + replayed_test_primitives, + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, test_value)))); +}