Skip to content

Commit

Permalink
Add recorder stop() API (backport #1300)
Browse files Browse the repository at this point in the history
* Expose Writer::close() as public API and add Recorder::stop() API

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
Co-authored-by: aditya <aditya@nimble.ai>

* Move routines from recorder's destructor to RecorderImpl::stop()

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add can_record_again_after_stop unit test in rosbag2_transport

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Update Doxygen comments for stop() and pause() API in recorder.hpp

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Jun 13, 2023
1 parent 7351423 commit 02438aa
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
9 changes: 8 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,17 @@ class Recorder : public rclcpp::Node
return subscriptions_;
}

/// @brief Stopping recording.
/// @details The stop() is opposite to the record() operation. It will stop recording, dump
/// all buffers to the disk and close writer. The record() can be called again after stop().
ROSBAG2_TRANSPORT_PUBLIC
void stop();

ROSBAG2_TRANSPORT_PUBLIC
const rosbag2_cpp::Writer & get_writer_handle();

/// Pause the recording.
/// @brief Pause the recording.
/// @details Will keep writer open and skip messages upon arrival on subscriptions.
ROSBAG2_TRANSPORT_PUBLIC
void pause();

Expand Down
10 changes: 9 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,19 @@ Recorder::Recorder(
Recorder::~Recorder()
{
keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_);
stop();
}


void Recorder::stop()
{
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
}

paused_ = true;
subscriptions_.clear();
writer_->close(); // Call writer->close() to finalize current bag file and write metadata

{
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_);
Expand All @@ -126,6 +133,7 @@ Recorder::~Recorder()

void Recorder::record()
{
paused_ = record_options_.start_paused;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
if (record_options_.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
snapshot_mode_ = storage_options.snapshot_mode;
(void) storage_options;
(void) converter_options;
writer_close_called_ = false;
}

void close() override {}
void close() override
{
writer_close_called_ = true;
}

void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override
{
Expand Down Expand Up @@ -118,6 +122,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return max_messages_per_file_;
}

bool closed_was_called() const
{
return writer_close_called_;
}

private:
std::unordered_map<std::string, rosbag2_storage::TopicMetadata> topics_;
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
Expand All @@ -128,6 +137,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
rosbag2_cpp::bag_events::EventCallbackManager callback_manager_;
size_t file_number_ = 0;
size_t max_messages_per_file_ = 0;
bool writer_close_called_{false};
};

#endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_
54 changes: 53 additions & 1 deletion rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, {string_topic, array_topic}, "rmw_format", 100ms};
{false, false, {string_topic, array_topic}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -88,6 +88,58 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
EXPECT_THAT(array_messages[0]->float32_values, Eq(array_message->float32_values));
}

TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop)
{
auto string_message = get_messages_strings()[1];
std::string string_topic = "/string_topic";

rosbag2_test_common::PublicationManager pub_manager;
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, {string_topic}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();

auto & writer = recorder->get_writer_handle();
auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle());

start_async_spin(recorder);
ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));

pub_manager.run_publishers();

EXPECT_FALSE(mock_writer.closed_was_called());
recorder->stop();
EXPECT_TRUE(mock_writer.closed_was_called());

// Record one more time after stop()
recorder->record();

ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));
pub_manager.run_publishers();

size_t expected_messages = 4; // 4 because was running recorder-record() and publishers twice
auto ret = rosbag2_test_common::wait_until_shutdown(
std::chrono::seconds(5),
[&mock_writer, &expected_messages]() {
return mock_writer.get_messages().size() >= expected_messages;
});
auto recorded_messages = mock_writer.get_messages();
EXPECT_TRUE(ret) << "failed to capture expected messages in time";
EXPECT_THAT(recorded_messages, SizeIs(expected_messages));

auto recorded_topics = mock_writer.get_topics();
ASSERT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size();
EXPECT_THAT(recorded_topics.at(string_topic).serialization_format, Eq("rmw_format"));
ASSERT_THAT(recorded_messages, SizeIs(expected_messages));
auto string_messages = filter_messages<test_msgs::msg::Strings>(
recorded_messages, string_topic);
ASSERT_THAT(string_messages, SizeIs(4));
EXPECT_THAT(string_messages[0]->string_value, Eq(string_message->string_value));
}

TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
{
auto string_message = get_messages_strings()[1];
Expand Down

0 comments on commit 02438aa

Please sign in to comment.