Skip to content

Commit

Permalink
Fix deadlocks in pause(), resume() and toggle_paused() methods
Browse files Browse the repository at this point in the history
- Removed mutex lock in pause(), resume() and toggle_paused() methods
and use only atomic value change for `paused_` variable.
- Renamed `state_transition_mutex_` to the
`start_stop_transition_mutex_`.
- Add unit test for `toggle_paused()`

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Mar 18, 2024
1 parent 4fba7a4 commit d788074
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
26 changes: 10 additions & 16 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class RecorderImpl
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
rclcpp::Service<rosbag2_interfaces::srv::SplitBagfile>::SharedPtr srv_split_bagfile_;

std::mutex state_transition_mutex_;
std::mutex start_stop_transition_mutex_;
std::mutex discovery_mutex_;
std::atomic<bool> stop_discovery_ = false;
std::atomic<bool> paused_ = false;
Expand Down Expand Up @@ -223,7 +223,7 @@ RecorderImpl::~RecorderImpl()

void RecorderImpl::stop()
{
std::lock_guard<std::mutex> state_lock(state_transition_mutex_);
std::lock_guard<std::mutex> state_lock(start_stop_transition_mutex_);
if (!in_recording_) {
RCLCPP_DEBUG(node->get_logger(), "Recording has already been stopped or not running.");
return;
Expand All @@ -248,7 +248,7 @@ void RecorderImpl::stop()

void RecorderImpl::record()
{
std::lock_guard<std::mutex> state_lock(state_transition_mutex_);
std::lock_guard<std::mutex> state_lock(start_stop_transition_mutex_);
if (in_recording_.exchange(true)) {
RCLCPP_WARN_STREAM(
node->get_logger(),
Expand Down Expand Up @@ -396,33 +396,28 @@ const rosbag2_cpp::Writer & RecorderImpl::get_writer_handle()

void RecorderImpl::pause()
{
std::lock_guard<std::mutex> state_lock(state_transition_mutex_);
if (paused_) {
RCLCPP_DEBUG(node->get_logger(), "Recorder is alreadyin pause state.");
if (paused_.exchange(true)) {
RCLCPP_DEBUG(node->get_logger(), "Recorder is already in pause state.");
} else {
paused_.store(true);
RCLCPP_INFO_STREAM(node->get_logger(), "Pausing recording.");
}
}

void RecorderImpl::resume()
{
std::lock_guard<std::mutex> state_lock(state_transition_mutex_);
if (!paused_) {
if (paused_.exchange(false)) {
RCLCPP_DEBUG(node->get_logger(), "Already in the recording.");
} else {
paused_.store(false);
RCLCPP_INFO_STREAM(node->get_logger(), "Resuming recording.");
}
}

void RecorderImpl::toggle_paused()
{
std::lock_guard<std::mutex> state_lock(state_transition_mutex_);
if (paused_.load()) {
this->resume();
if (atomic_fetch_xor((std::atomic_uchar *)&paused_, 1)) {
RCLCPP_INFO_STREAM(node->get_logger(), "Resuming recording.");
} else {
this->pause();
RCLCPP_INFO_STREAM(node->get_logger(), "Pausing recording.");
}
}

Expand All @@ -447,8 +442,7 @@ void RecorderImpl::stop_discovery()
std::lock_guard<std::mutex> state_lock(discovery_mutex_);
if (stop_discovery_.exchange(true)) {
RCLCPP_DEBUG(
node->get_logger(),
"Recorder topic discovery has already been stopped or not running.");
node->get_logger(), "Recorder topic discovery has already been stopped or not running.");
} else {
if (discovery_future_.valid()) {
auto status = discovery_future_.wait_for(2 * record_options_.topic_polling_interval);
Expand Down
21 changes: 21 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,24 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called)
EXPECT_EQ(closed_file, "BagFile0");
EXPECT_EQ(opened_file, "BagFile1");
}

TEST_F(RecordIntegrationTestFixture, toggle_paused_do_pause_resume)
{
rosbag2_transport::RecordOptions record_options{};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->pause();
ASSERT_TRUE(recorder->is_paused());

testing::internal::CaptureStderr();
recorder->toggle_paused();
std::string test_output = testing::internal::GetCapturedStderr();
EXPECT_FALSE(recorder->is_paused());
EXPECT_TRUE(test_output.find("Resuming recording.") != std::string::npos);

testing::internal::CaptureStderr();
recorder->toggle_paused();
test_output = testing::internal::GetCapturedStderr();
EXPECT_TRUE(recorder->is_paused());
EXPECT_TRUE(test_output.find("Pausing recording.") != std::string::npos);
}

0 comments on commit d788074

Please sign in to comment.