Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to prevent message loss while converting #1058

Conversation

DensoADAS
Copy link
Contributor

because the SequentialCompressionWriter uses threads for compression messages can be dropped if the writer-thread is slower than reading. To prevent this (without having a very big queue size) the messages are written directly if compression_queue_size is 0.

Use case:
offline converting (compression) --> no messages should be dropped, execution time is not that relevant

@DensoADAS DensoADAS requested a review from a team as a code owner August 3, 2022 09:29
@DensoADAS DensoADAS requested review from gbiggs and jhdcs and removed request for a team August 3, 2022 09:29
Copy link
Contributor

@MichaelOrlov MichaelOrlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DensoADAS @emersonknapp Do you know why it was decided to drop messages instead of waiting until it will be a free spot in SequentialCompressionWriter queue from the first hand? Refer https://github.com/ros2/rosbag2/pull/1058/files#diff-39403f3879768a372efbcdd5a08b7d399259229334e96593ebf0ca16c2a76a89R334

I would prefer to block and wait on conditional variable in
SequentialCompressionWriter::write method until internal queue will have a free spot.
In this case messages drop will be happened on DDS layer and up to the QoS configuration.

With such approach issue with messages lost in converter will be solved by design. And it will be possible to use multiple threads for compression in converter without messages lost.

@DensoADAS
Copy link
Contributor Author

@DensoADAS @emersonknapp Do you know why it was decided to drop messages instead of waiting until it will be a free spot in SequentialCompressionWriter queue from the first hand?

Not really. But I assume there are two use cases:
a) compress while recording (data source is live and cannot be delayed anyway -> drop data to prevent system overload)
b) offline conversion (like our use case) -> you do not want to drop any messages as speed is not an issue

Probably only use case a) was considered in the design.

I will add the conditional varibal.

@MichaelOrlov
Copy link
Contributor

@DensoADAS but in case

a) compress while recording (data source is live and cannot be delayed anyway -> drop data to prevent system overload)

Don't need to drop messages explicitly. They will be dropped on DDS transport layer if reader will not be able to read from DDS queue in timely manner.

@DensoADAS
Copy link
Contributor Author

Don't need to drop messages explicitly. They will be dropped on DDS transport layer if reader will not be able to read from DDS queue in timely manner.

In the original implementation it would just put the messages in the queue to feed the threads.

Updated the PR.

Copy link
Contributor

@MichaelOrlov MichaelOrlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DensoADAS Thank you for rewriting with condition variable.
Implementation looks good to me.
Could you please add a couple unit tests?
Aka

TEST_F(SequentialCompressionWriterTest, writer_writes_with_compression_queue_size_zero)
{
  const std::string test_topic_name = "test_topic";
  const std::string test_topic_type = "test_msgs/BasicTypes";
  rosbag2_compression::CompressionOptions compression_options {
    DefaultTestCompressor,
    rosbag2_compression::CompressionMode::MESSAGE,
    0,
    kDefaultCompressionQueueThreads
  };

  initializeFakeFileStorage();
  initializeWriter(compression_options);

  writer_->open(tmp_dir_storage_options_);
  writer_->create_topic({test_topic_name, test_topic_type, "", ""});

  auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
  message->topic_name = test_topic_name;

  const size_t kNumMessagesToWrite = 5;
  for (size_t i = 0; i < kNumMessagesToWrite; i++) {
    writer_->write(message);
  }
  writer_.reset();  // reset will call writer destructor

  EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

TEST_F(SequentialCompressionWriterTest, writer_writes_with_compression_queue_size_none_zero)
{
  const std::string test_topic_name = "test_topic";
  const std::string test_topic_type = "test_msgs/BasicTypes";
  rosbag2_compression::CompressionOptions compression_options {
    DefaultTestCompressor,
    rosbag2_compression::CompressionMode::MESSAGE,
    kDefaultCompressionQueueSize,
    kDefaultCompressionQueueThreads
  };

  initializeFakeFileStorage();
  initializeWriter(compression_options);

  writer_->open(tmp_dir_storage_options_);
  writer_->create_topic({test_topic_name, test_topic_type, "", ""});

  auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
  message->topic_name = test_topic_name;

  const size_t kNumMessagesToWrite = 5;
  for (size_t i = 0; i < kNumMessagesToWrite; i++) {
    writer_->write(message);
  }
  writer_.reset();  // reset will call writer destructor

  EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

Also will need to change FakeCompressor to copy topic name at least to avoid test failure

void FakeCompressor::compress_serialized_bag_message(
  const rosbag2_storage::SerializedBagMessage * input_message_ptr,
  rosbag2_storage::SerializedBagMessage * compressed_message_ptr) {
  compressed_message_ptr->topic_name = input_message_ptr->topic_name;
  compressed_message_ptr->time_stamp = input_message_ptr->time_stamp;
}

@DensoADAS
Copy link
Contributor Author

@MichaelOrlov added the unit test

Copy link
Contributor

@MichaelOrlov MichaelOrlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DensoADAS Thanks for adding unit tests. Now it looks good to me.
However our DCO check complain with error messages:

Summary
Commit sha: [b8da018](https://github.com/ros2/rosbag2/pull/1058/commits/b8da01868dd72571ceec021bf893575b0451bb2d), Author: Joshua Hampp, Committer: Joshua Hampp; Expected "Joshua Hampp [j.hampp@denso-adas.de](mailto:j.hampp@denso-adas.de)", but got "Joshua Hampp [j.hampp@eu.denso.com](mailto:j.hampp@eu.denso.com)".
Commit sha: [de4f205](https://github.com/ros2/rosbag2/pull/1058/commits/de4f205df799dd7c44359e40c3ae3a8936607a35), Author: Joshua Hampp, Committer: Joshua Hampp; Expected "Joshua Hampp [j.hampp@denso-adas.de](mailto:j.hampp@denso-adas.de)", but got "Joshua Hampp [j.hampp@eu.denso.com](mailto:j.hampp@eu.denso.com)".

Could you please fix DCO in your commits?

Joshua Hampp added 6 commits September 9, 2022 06:57
…le converting

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
@DensoADAS DensoADAS force-pushed the feature/prevent_message_loss_in_sequential_compression_writer branch from de4f205 to 61caf5f Compare September 9, 2022 04:58
@MichaelOrlov
Copy link
Contributor

Gist: https://gist.githubusercontent.com/MichaelOrlov/5d846b7c3773f73380d56ceeefc8d866/raw/b2b1e49362dbbaeaae730e0ef2b9a18eb644ffc9/ros2.repos
BUILD args: --packages-above-and-dependencies ros2bag rosbag2_compression rosbag2_tests
TEST args: --packages-above ros2bag rosbag2_compression rosbag2_tests
ROS Distro: rolling
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/10776

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Windows Build Status

@MichaelOrlov MichaelOrlov changed the title added option to prevent message loss while converting Add option to prevent message loss while converting Sep 10, 2022
@MichaelOrlov
Copy link
Contributor

Warning on Windows build exists on baseline and unrelated to the changes from this PR.

@MichaelOrlov MichaelOrlov merged commit 8325425 into ros2:rolling Sep 10, 2022
@ros-discourse
Copy link

This pull request has been mentioned on ROS Discourse. There might be relevant details there:

https://discourse.ros.org/t/ros-2-tsc-meeting-minutes-2022-9-15/27394/1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants