Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set compression threads priority #1457

Merged
merged 10 commits into from Dec 16, 2023
1 change: 1 addition & 0 deletions rosbag2_compression/CMakeLists.txt
Expand Up @@ -107,6 +107,7 @@ if(BUILD_TESTING)
test/rosbag2_compression/test_sequential_compression_writer.cpp)
target_link_libraries(test_sequential_compression_writer
${PROJECT_NAME}
fake_plugin
rosbag2_cpp::rosbag2_cpp
rosbag2_storage::rosbag2_storage
)
Expand Down
Expand Up @@ -17,6 +17,7 @@

#include <cstdint>
#include <string>
#include <optional>

#include "visibility_control.hpp"

Expand Down Expand Up @@ -60,7 +61,12 @@ struct CompressionOptions
std::string compression_format;
CompressionMode compression_mode;
uint64_t compression_queue_size;
/// \brief // The number of compression threads
uint64_t compression_threads;
/// \brief If set, the compression thread(s) will try to set the given priority for itself
/// For Windows the valid values are: THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL and
/// THREAD_PRIORITY_NORMAL. For POSIX compatible OSes this is the "nice" value.
std::optional<int32_t> thread_priority;
};

} // namespace rosbag2_compression
Expand Down
Expand Up @@ -16,6 +16,7 @@

#include <algorithm>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <stdexcept>
Expand All @@ -33,6 +34,12 @@
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"

#include "logging.hpp"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/resource.h>
#endif

namespace rosbag2_compression
{
Expand Down Expand Up @@ -62,6 +69,45 @@ SequentialCompressionWriter::~SequentialCompressionWriter()

void SequentialCompressionWriter::compression_thread_fn()
{
if (compression_options_.thread_priority) {
#ifdef _WIN32
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
// This must match THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST...
int wanted_thread_priority = *compression_options_.thread_priority;
if (!SetThreadPriority(GetCurrentThread(), wanted_thread_priority)) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set thread priority of compression thread to: " << wanted_thread_priority <<
". Error code: " << GetLastError());
} else {
auto detected_thread_priority = GetThreadPriority(GetCurrentThread());
if (detected_thread_priority == THREAD_PRIORITY_ERROR_RETURN) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Failed to get current thread priority. Error code: " << GetLastError());
} else if (wanted_thread_priority != detected_thread_priority) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set thread priority of compression thread to: " <<
wanted_thread_priority << ". Detected thread priority: " << detected_thread_priority);
}
}
#else
int wanted_nice_value = *compression_options_.thread_priority;

errno = 0;
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
if (cur_nice_value == -1 && errno != 0) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set nice value of compression thread to: " << wanted_nice_value <<
" : Could not determine cur nice value");
} else {
int new_nice_value = nice(wanted_nice_value - cur_nice_value);
if ((new_nice_value == -1 && errno != 0)) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Could not set nice value of compression thread to: " << wanted_nice_value <<
". Error : " << std::strerror(errno));
}
}
#endif
}

// Every thread needs to have its own compression context for thread safety.
auto compressor = compression_factory_->create_compressor(
compression_options_.compression_format);
Expand Down
@@ -0,0 +1,45 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_

#include <memory>
#include <string>

#include "rosbag2_compression/compression_factory.hpp"
#include "fake_compressor.hpp"

class ROSBAG2_COMPRESSION_EXPORT FakeCompressionFactory
: public rosbag2_compression::CompressionFactory
{
public:
FakeCompressionFactory() = delete;

~FakeCompressionFactory() override = default;

explicit FakeCompressionFactory(int & detected_thread_priority)
: detected_thread_priority_(detected_thread_priority) {}

std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/) override
{
return std::make_shared<FakeCompressor>(detected_thread_priority_);
}

private:
int & detected_thread_priority_;
};

#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
Expand Up @@ -13,11 +13,27 @@
// limitations under the License.

#include <string>
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/resource.h>
#endif

#include "pluginlib/class_list_macros.hpp"

#include "fake_compressor.hpp"

FakeCompressor::FakeCompressor(int & detected_thread_priority)
{
#ifndef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {
detected_thread_priority = cur_nice_value;
}
#else
detected_thread_priority = GetThreadPriority(GetCurrentThread());
#endif
}

std::string FakeCompressor::compress_uri(const std::string & uri)
{
return uri + "." + get_compression_identifier();
Expand Down
Expand Up @@ -20,11 +20,14 @@
#include "rosbag2_compression/base_compressor_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

class FakeCompressor : public rosbag2_compression::BaseCompressorInterface
class ROSBAG2_COMPRESSION_EXPORT FakeCompressor : public rosbag2_compression::
BaseCompressorInterface
{
public:
FakeCompressor() = default;

explicit FakeCompressor(int & detected_thread_priority);

std::string compress_uri(const std::string & uri) override;

void compress_serialized_bag_message(
Expand Down
Expand Up @@ -36,6 +36,14 @@
#include "mock_storage_factory.hpp"

#include "mock_compression_factory.hpp"
#include "fake_compression_factory.hpp"

#ifdef _WIN32
#include <windows.h>
#else
#include <sys/time.h>
#include <sys/resource.h>
#endif

using namespace testing; // NOLINT

Expand Down Expand Up @@ -143,13 +151,15 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
const std::optional<int32_t> kDefaultCompressionQueueThreadsPriority = std::nullopt;
};

TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -163,7 +173,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format)
{
rosbag2_compression::CompressionOptions compression_options{
"bad_format", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

EXPECT_THROW(
Expand All @@ -175,7 +186,8 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};

// Set minimum file size greater than max bagfile size option
const uint64_t min_split_file_size = 10;
Expand All @@ -196,7 +208,8 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
Expand All @@ -211,7 +224,8 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
{
rosbag2_compression::CompressionOptions compression_options{
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority};
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1);

Expand All @@ -235,7 +249,8 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -277,7 +292,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -315,7 +331,8 @@ TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand Down Expand Up @@ -363,7 +380,8 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads
kDefaultCompressionQueueThreads,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
Expand All @@ -384,6 +402,60 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
const uint64_t kCompressionQueueSize = GetParam();
#ifndef _WIN32
const int32_t wanted_thread_priority = 10;
errno = 0;
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
ASSERT_TRUE(!(cur_nice_value == -1 && errno != 0));
#else
const int32_t wanted_thread_priority = THREAD_PRIORITY_LOWEST;
auto current_thread_priority = GetThreadPriority(GetCurrentThread());
ASSERT_NE(current_thread_priority, THREAD_PRIORITY_ERROR_RETURN);
ASSERT_NE(current_thread_priority, wanted_thread_priority);
#endif

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads,
wanted_thread_priority
};

#ifndef _WIN32
// nice values are in the range from -20 to +19, so this value will never be read
int32_t detected_thread_priority = 100;
#else
int32_t detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN;
#endif

initializeFakeFileStorage();
initializeWriter(
compression_options,
std::make_unique<FakeCompressionFactory>(detected_thread_priority));

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", {}, ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_thread_priority, *compression_options.thread_priority);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
Expand Up @@ -178,7 +178,7 @@ void WriterBenchmark::create_writer()
if (!bag_config_.compression_format.empty()) {
rosbag2_compression::CompressionOptions compression_options{
bag_config_.compression_format, rosbag2_compression::CompressionMode::MESSAGE,
bag_config_.compression_queue_size, bag_config_.compression_threads};
bag_config_.compression_queue_size, bag_config_.compression_threads, std::nullopt};

writer_ = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options);
Expand Down
Expand Up @@ -42,6 +42,7 @@ struct RecordOptions
std::string compression_format = "";
uint64_t compression_queue_size = 1;
uint64_t compression_threads = 0;
int32_t compression_threads_priority = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool include_unpublished_topics = false;
Expand Down
Expand Up @@ -235,6 +235,10 @@ RecordOptions get_record_options_from_node_params(rclcpp::Node & node)
node, "record.compression_threads", 0, std::numeric_limits<int64_t>::max(),
record_options.compression_threads);

record_options.compression_threads_priority = param_utils::declare_integer_node_params<int32_t>(
node, "record.compression_threads_priority", std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max(), record_options.compression_threads_priority);

std::string qos_profile_overrides_path =
node.declare_parameter<std::string>("record.qos_profile_overrides_path", "");

Expand Down
Expand Up @@ -53,7 +53,8 @@ std::unique_ptr<rosbag2_cpp::Writer> ReaderWriterFactory::make_writer(
record_options.compression_format,
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
record_options.compression_queue_size,
record_options.compression_threads
record_options.compression_threads,
record_options.compression_threads_priority,
};
if (compression_options.compression_threads < 1) {
compression_options.compression_threads = std::thread::hardware_concurrency();
Expand Down