Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use thread pool to run benchmark publishers in rosbag2_performance_benchmarking #1250

Merged
merged 11 commits into from Feb 28, 2023
Merged
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.5)
project(rosbag2_performance_benchmarking)

if (NOT BUILD_ROSBAG2_BENCHMARKS)
if(NOT BUILD_ROSBAG2_BENCHMARKS)
return()
endif()

Expand All @@ -26,10 +26,11 @@ find_package(rosbag2_performance_benchmarking_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(yaml_cpp_vendor REQUIRED)

add_executable(writer_benchmark
src/config_utils.cpp
src/result_utils.cpp
src/writer_benchmark.cpp)
add_executable(writer_benchmark
src/config_utils.cpp
src/result_utils.cpp
src/writer_benchmark.cpp
src/msg_utils/helpers.cpp)

add_executable(benchmark_publishers
src/benchmark_publishers.cpp
Expand Down
Expand Up @@ -39,6 +39,12 @@ storage uri, which is used to read the bag metadata file.

Note that while you can opt to select compression for benchmarking, the generated data is random so it is likely not representative for this specific case. To publish non-random data, you need to modify the ByteProducer.

#### Number of publisher threads

In the case of the `benchmark_publishers` binary, a pool of threads is created to run the publishers. By default,
the number of threads is equal to the number of publishers. It is possible to change the number of threads
using the optional `number_of_threads` parameter.

## Building

To build the package in the rosbag2 build process, make sure to turn `BUILD_ROSBAG2_BENCHMARKS` flag on (e.g. `colcon build --cmake-args -DBUILD_ROSBAG2_BENCHMARKS=1`)
Expand Down
Expand Up @@ -3,6 +3,7 @@ rosbag2_performance_benchmarking_node:
publishers: # publisher_groups parameter needs to include all the subsequent groups
publisher_groups: [ "lidars_secondary_16MBps", "lidars_main_16MBps", "cameras_540MBps", "radars_400KBps", "gps_5KBps", "imu_40KBps", "ultrasonic_72KBps" ]
wait_for_subscriptions: True
# number_of_threads: 16
lidars_secondary_16MBps:
publishers_count: 4
topic_root: "lidar_secondary"
Expand Down
Expand Up @@ -3,6 +3,7 @@ rosbag2_performance_benchmarking_node:
publishers: # publisher_groups parameter needs to include all the subsequent groups
publisher_groups: [ "10Mbs_many_frequent_small", "100Mbs_large" ]
wait_for_subscriptions: True
# number_of_threads: 16
10Mbs_many_frequent_small:
publishers_count: 500
topic_root: "benchmarking_small"
Expand Down
Expand Up @@ -3,6 +3,7 @@ rosbag2_performance_benchmarking_node:
publishers: # publisher_groups parameter needs to include all the subsequent groups
publisher_groups: [ "10Mbs_many_frequent_small", "100Mbs_large" ]
wait_for_subscriptions: True
# number_of_threads: 16
10Mbs_many_frequent_small:
publishers_count: 100
topic_root: "benchmarking_small"
Expand Down
Expand Up @@ -21,23 +21,11 @@
#include <functional>

#include "rclcpp/utilities.hpp"
#include "msg_utils/helpers.hpp"
#include "rosbag2_performance_benchmarking_msgs/msg/byte_array.hpp"

#include "rosbag2_performance_benchmarking/producer_config.hpp"

inline auto generate_random_message(const ProducerConfig & config)
{
// Reuses the same random message
auto message = std::make_shared<rosbag2_performance_benchmarking_msgs::msg::ByteArray>();

message->data.reserve(config.message_size);
for (auto i = 0u; i < config.message_size; ++i) {
message->data.emplace_back(std::rand() % 255);
}

return message;
}

class ByteProducer
{
public:
Expand All @@ -57,8 +45,10 @@ class ByteProducer
producer_callback_(producer_callback),
producer_finalize_(producer_finalize),
sleep_time_(configuration_.frequency == 0 ? 1 : 1000 / configuration_.frequency),
message_(generate_random_message(configuration_))
{}
message_(std::make_shared<rosbag2_performance_benchmarking_msgs::msg::ByteArray>())
{
msg_utils::helpers::generate_data(*message_, configuration_.message_size);
}

void run()
{
Expand Down
Expand Up @@ -37,6 +37,9 @@ void load_qos_configuration(
/// Acquires the parameter determining whether to wait for subscriber
bool wait_for_subscriptions_from_node_parameters(rclcpp::Node & node);

/// Gets the number of threads used for the thread pool
size_t get_number_of_threads_from_node_parameters(rclcpp::Node & node);

/// Acquires publisher parameters from the node
std::vector<PublisherGroupConfig> publisher_groups_from_node_parameters(
rclcpp::Node & node);
Expand Down
@@ -0,0 +1,99 @@
// Copyright 2022 Apex.AI, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_
#define ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_

#include <mutex>
#include <queue>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>

class ThreadPool
{
public:
using job_type = std::function<void ()>;
~ThreadPool()
{
this->terminate();
}

void start(size_t size)
{
if (!threads_.empty()) {
throw std::runtime_error("thread pool already started");
}

for (size_t i = 0; i < size; ++i) {
threads_.emplace_back(
[this] {
thread_task();
});
}
}

void queue(job_type job)
{
if (job == nullptr) {
throw std::invalid_argument("job is nullptr");
}

std::lock_guard<std::mutex> l(jobs_queue_mutex_);
jobs_queue_.push(job);
jobs_queue_cv_.notify_one();
}

void terminate()
{
terminate_ = true;
jobs_queue_cv_.notify_all();
for (auto & t : threads_) {
if (t.joinable()) {t.join();}
}
threads_.clear();
}

private:
void thread_task()
{
while (true) {
job_type job;
{
std::unique_lock<std::mutex> lock(jobs_queue_mutex_);
jobs_queue_cv_.wait(
lock, [this] {
return !jobs_queue_.empty() || terminate_;
});

if (terminate_) {
break;
}

job = jobs_queue_.front();
jobs_queue_.pop();
}
job();
}
}

bool terminate_ = false;
std::mutex jobs_queue_mutex_;
std::queue<job_type> jobs_queue_;
std::condition_variable jobs_queue_cv_;
std::vector<std::thread> threads_;
};

#endif // ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_
Expand Up @@ -20,13 +20,13 @@
#include "rosbag2_performance_benchmarking/byte_producer.hpp"
#include "rosbag2_performance_benchmarking/config_utils.hpp"
#include "rosbag2_performance_benchmarking/publisher_group_config.hpp"
#include "rosbag2_performance_benchmarking/thread_pool.hpp"

#include "msg_utils/message_producer_factory.hpp"

#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp"
#include "rosbag2_performance_benchmarking_msgs/msg/byte_array.hpp"


class BenchmarkPublishers : public rclcpp::Node
{
Expand All @@ -40,24 +40,22 @@ class BenchmarkPublishers : public rclcpp::Node

void run()
{
std::vector<std::thread> producer_threads;
thread_pool_.start(number_of_threads_);

for (auto & producer : producers_) {
producer_threads.emplace_back(
[this, producer]() {
this->producer_job(producer);
});
const auto finished_future = producer->promise_finished.get_future();
finished_future.wait();
}

for (auto & thread : producer_threads) {
thread.join();
}
thread_pool_.terminate();
}

private:
struct BenchmarkProducer
{
std::shared_ptr<msg_utils::ProducerBase> msg_producer;
std::chrono::milliseconds period;
std::promise<void> promise_finished;
std::chrono::milliseconds period{0};
size_t produced_messages = 0;
size_t max_messages = 0;

Expand All @@ -71,7 +69,7 @@ class BenchmarkPublishers : public rclcpp::Node
void wait_for_subscriptions()
{
if (config_utils::wait_for_subscriptions_from_node_parameters(*this)) {
for (auto producer : producers_) {
for (const auto & producer : producers_) {
producer->msg_producer->wait_for_matched();
}
}
Expand All @@ -87,19 +85,29 @@ class BenchmarkPublishers : public rclcpp::Node
}

const std::string node_name(get_fully_qualified_name());
const auto when_to_start = std::chrono::high_resolution_clock::now() + std::chrono::seconds(1);

size_t total_producers_number = 0U;
for (auto & config : configurations) {
for (unsigned int i = 0; i < config.count; ++i) {
const std::string topic = node_name + "/" + config.topic_root + "_" + std::to_string(i + 1);
auto producer = create_benchmark_producer(topic, config);
auto producer = create_benchmark_producer(topic, config, when_to_start);
producers_.push_back(producer);
total_producers_number++;
}
}

number_of_threads_ = config_utils::get_number_of_threads_from_node_parameters(*this);
// by default the number of threads is equal to the number of producers
if (number_of_threads_ == 0) {
number_of_threads_ = total_producers_number;
}
}

std::shared_ptr<BenchmarkProducer> create_benchmark_producer(
std::string topic,
const PublisherGroupConfig & config)
const PublisherGroupConfig & config,
std::chrono::time_point<std::chrono::high_resolution_clock> initial_time)
{
const auto & producer_config = config.producer_config;
auto producer = std::make_shared<BenchmarkProducer>();
Expand All @@ -109,20 +117,40 @@ class BenchmarkPublishers : public rclcpp::Node
producer->period = std::chrono::milliseconds(
producer_config.frequency ? 1000 / producer_config.frequency : 1);

if (producer->max_messages > 0) {
thread_pool_.queue(
[this, initial_time, producer] {
producer_job(initial_time, producer);
});
}

return producer;
}

void producer_job(std::shared_ptr<BenchmarkProducer> producer)
void producer_job(
std::chrono::time_point<std::chrono::high_resolution_clock> when,
std::shared_ptr<BenchmarkProducer> producer)
{
for (auto i = 0u; i < producer->max_messages; ++i) {
if (!rclcpp::ok()) {
break;
}
std::this_thread::sleep_for(producer->period);
producer->produce();
std::this_thread::sleep_until(when);
if (!rclcpp::ok()) {
producer->promise_finished.set_value();
return;
}
producer->produce();

if (producer->produced_messages < producer->max_messages) {
thread_pool_.queue(
[this, next_timestamp = when + producer->period,
producer] {
producer_job(next_timestamp, producer);
});
} else {
producer->promise_finished.set_value();
}
}

ThreadPool thread_pool_;
size_t number_of_threads_;
std::vector<std::shared_ptr<BenchmarkProducer>> producers_;
};

Expand Down
Expand Up @@ -58,6 +58,15 @@ bool wait_for_subscriptions_from_node_parameters(rclcpp::Node & node)
return wait_for_subscriptions;
}

size_t get_number_of_threads_from_node_parameters(rclcpp::Node & node)
{
const std::string parameters_ns = "publishers";
node.declare_parameter<int>(parameters_ns + ".number_of_threads", 0);
size_t number_of_threads;
node.get_parameter(parameters_ns + ".number_of_threads", number_of_threads);
return number_of_threads;
}

std::vector<PublisherGroupConfig> publisher_groups_from_node_parameters(
rclcpp::Node & node)
{
Expand Down
Expand Up @@ -42,6 +42,11 @@ WriterBenchmark::WriterBenchmark(const std::string & name)

bag_config_ = config_utils::bag_config_from_node_parameters(*this);

const auto number_of_threads = config_utils::get_number_of_threads_from_node_parameters(*this);
if (number_of_threads != 0) {
RCLCPP_WARN(get_logger(), "number_of_threads parameter is not used in writer_benchmark");
}

this->declare_parameter("results_file", bag_config_.storage_options.uri + "/results.csv");
this->get_parameter("results_file", results_file_);

Expand Down Expand Up @@ -188,7 +193,7 @@ void WriterBenchmark::create_writer()
rosbag2_storage::TopicMetadata topic;
topic.name = queue->topic_name();
// TODO(adamdbrw) - replace with something more general if needed
topic.type = "std_msgs::msgs::ByteMultiArray";
topic.type = "rosbag2_performance_benchmarking_msgs/msg/ByteArray";
topic.serialization_format = serialization_format;
writer_->create_topic(topic);
}
Expand Down