Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use thread pool to run benchmark publishers in rosbag2_performance_be…
…nchmarking (#1250) * Add thread pool for benchmark_publishers execution Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Default the number of threads to the number of publishers Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Skip ament_package() call when not building rosbag2_performance_benchmarking (#1242) Signed-off-by: Shane Loretz <sloretz@google.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Update rosbag2_performance/rosbag2_performance_benchmarking/include/rosbag2_performance_benchmarking/config_utils.hpp Co-authored-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Update rosbag2_performance/rosbag2_performance_benchmarking/include/rosbag2_performance_benchmarking/thread_pool.hpp Co-authored-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Update rosbag2_performance/rosbag2_performance_benchmarking/src/benchmark_publishers.cpp Co-authored-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Update rosbag2_performance/rosbag2_performance_benchmarking/include/rosbag2_performance_benchmarking/thread_pool.hpp Co-authored-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Update rosbag2_performance/rosbag2_performance_benchmarking/include/rosbag2_performance_benchmarking/thread_pool.hpp Co-authored-by: Michael Orlov <morlovmr@gmail.com> Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Address reviewer comments Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Address reviewer comments Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> * Address reviewer comments Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> --------- Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai> Signed-off-by: Shane Loretz <sloretz@google.com> Co-authored-by: Shane Loretz <sloretz@google.com> Co-authored-by: Michael Orlov <morlovmr@gmail.com>
- Loading branch information
1 parent
260e20e
commit 6160bb3
Showing
11 changed files
with
186 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
...rosbag2_performance_benchmarking/include/rosbag2_performance_benchmarking/thread_pool.hpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright 2022 Apex.AI, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#ifndef ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_ | ||
#define ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_ | ||
|
||
#include <mutex> | ||
#include <queue> | ||
#include <vector> | ||
#include <thread> | ||
#include <functional> | ||
#include <condition_variable> | ||
|
||
class ThreadPool | ||
{ | ||
public: | ||
using job_type = std::function<void ()>; | ||
~ThreadPool() | ||
{ | ||
this->terminate(); | ||
} | ||
|
||
void start(size_t size) | ||
{ | ||
if (!threads_.empty()) { | ||
throw std::runtime_error("thread pool already started"); | ||
} | ||
|
||
for (size_t i = 0; i < size; ++i) { | ||
threads_.emplace_back( | ||
[this] { | ||
thread_task(); | ||
}); | ||
} | ||
} | ||
|
||
void queue(job_type job) | ||
{ | ||
if (job == nullptr) { | ||
throw std::invalid_argument("job is nullptr"); | ||
} | ||
|
||
std::lock_guard<std::mutex> l(jobs_queue_mutex_); | ||
jobs_queue_.push(job); | ||
jobs_queue_cv_.notify_one(); | ||
} | ||
|
||
void terminate() | ||
{ | ||
terminate_ = true; | ||
jobs_queue_cv_.notify_all(); | ||
for (auto & t : threads_) { | ||
if (t.joinable()) {t.join();} | ||
} | ||
threads_.clear(); | ||
} | ||
|
||
private: | ||
void thread_task() | ||
{ | ||
while (true) { | ||
job_type job; | ||
{ | ||
std::unique_lock<std::mutex> lock(jobs_queue_mutex_); | ||
jobs_queue_cv_.wait( | ||
lock, [this] { | ||
return !jobs_queue_.empty() || terminate_; | ||
}); | ||
|
||
if (terminate_) { | ||
break; | ||
} | ||
|
||
job = jobs_queue_.front(); | ||
jobs_queue_.pop(); | ||
} | ||
job(); | ||
} | ||
} | ||
|
||
bool terminate_ = false; | ||
std::mutex jobs_queue_mutex_; | ||
std::queue<job_type> jobs_queue_; | ||
std::condition_variable jobs_queue_cv_; | ||
std::vector<std::thread> threads_; | ||
}; | ||
|
||
#endif // ROSBAG2_PERFORMANCE_BENCHMARKING__THREAD_POOL_HPP_ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters