diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 0e7d4366e5..703066fa3f 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -15,10 +15,17 @@ #ifndef RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ +#include +#include #include +#include +#include +#include #include "rosidl_runtime_cpp/traits.hpp" +#include "rclcpp/logger.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/visibility_control.hpp" @@ -50,13 +57,24 @@ class MessagePoolMemoryStrategy public: RCLCPP_SMART_PTR_DEFINITIONS(MessagePoolMemoryStrategy) - /// Default constructor MessagePoolMemoryStrategy() - : next_array_index_(0) { + pool_mutex_ = std::make_shared(); + + pool_ = std::shared_ptr>( + new std::array, + [](std::array * arr) { + for (size_t i = 0; i < Size; ++i) { + free((*arr)[i]); + } + delete arr; + }); + + free_list_ = std::make_shared>(); + for (size_t i = 0; i < Size; ++i) { - pool_[i].msg_ptr_ = std::make_shared(); - pool_[i].used = false; + (*pool_)[i] = static_cast(malloc(sizeof(MessageT))); + free_list_->push_back(i); } } @@ -68,43 +86,85 @@ class MessagePoolMemoryStrategy */ std::shared_ptr borrow_message() { - size_t current_index = next_array_index_; - next_array_index_ = (next_array_index_ + 1) % Size; - if (pool_[current_index].used) { - throw std::runtime_error("Tried to access message that was still in use! Abort."); + std::lock_guard lock(*pool_mutex_); + if (free_list_->size() == 0) { + throw std::runtime_error("No more free slots in the pool"); } - pool_[current_index].msg_ptr_->~MessageT(); - new (pool_[current_index].msg_ptr_.get())MessageT; - pool_[current_index].used = true; - return pool_[current_index].msg_ptr_; + size_t current_index = free_list_->pop_front(); + + return std::shared_ptr( + new((*pool_)[current_index]) MessageT(), + [pool = this->pool_, pool_mutex = this->pool_mutex_, + free_list = this->free_list_](MessageT * p) { + std::lock_guard lock(*pool_mutex); + for (size_t i = 0; i < Size; ++i) { + if ((*pool)[i] == p) { + p->~MessageT(); + free_list->push_back(i); + break; + } + } + }); } /// Return a message to the message pool. /** - * Manage metadata in the message pool ring buffer to release the message. + * This does nothing since the message isn't returned to the pool until the user has dropped + * all references. * \param[in] msg Shared pointer to the message to return. */ void return_message(std::shared_ptr & msg) { - for (size_t i = 0; i < Size; ++i) { - if (pool_[i].msg_ptr_ == msg) { - pool_[i].used = false; - return; - } - } - throw std::runtime_error("Unrecognized message ptr in return_message."); + (void)msg; } protected: - struct PoolMember + template + class CircularArray { - std::shared_ptr msg_ptr_; - bool used; +public: + void push_back(const size_t v) + { + if (size_ + 1 > N) { + throw std::runtime_error("Tried to push too many items into the array"); + } + array_[(front_ + size_) % N] = v; + ++size_; + } + + size_t pop_front() + { + if (size_ < 1) { + throw std::runtime_error("Tried to pop item from empty array"); + } + + size_t val = array_[front_]; + + front_ = (front_ + 1) % N; + --size_; + + return val; + } + + size_t size() const + { + return size_; + } + +private: + size_t front_ = 0; + size_t size_ = 0; + std::array array_; }; - std::array pool_; - size_t next_array_index_; + // It's very important that these are shared_ptrs, since users of this class might hold a + // reference to a pool item longer than the lifetime of the class. In that scenario, the + // shared_ptr ensures that the lifetime of these variables outlives this class, and hence ensures + // the custom destructor for each pool item can successfully run. + std::shared_ptr pool_mutex_; + std::shared_ptr> pool_; + std::shared_ptr> free_list_; }; } // namespace message_pool_memory_strategy diff --git a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp index b68bc96ab6..69198adf59 100644 --- a/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp @@ -56,18 +56,28 @@ TEST_F(TestMessagePoolMemoryStrategy, borrow_too_many) { // Size is 1, borrowing second time should fail RCLCPP_EXPECT_THROW_EQ( message_memory_strategy_->borrow_message(), - std::runtime_error("Tried to access message that was still in use! Abort.")); + std::runtime_error("No more free slots in the pool")); EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); } -TEST_F(TestMessagePoolMemoryStrategy, return_unrecognized) { - auto message = message_memory_strategy_->borrow_message(); - ASSERT_NE(nullptr, message); +TEST_F(TestMessagePoolMemoryStrategy, borrow_hold_reference) { + { + auto message = message_memory_strategy_->borrow_message(); + ASSERT_NE(nullptr, message); - auto unrecognized = std::make_shared(); - // Unrecognized does not belong to pool - RCLCPP_EXPECT_THROW_EQ( - message_memory_strategy_->return_message(unrecognized), - std::runtime_error("Unrecognized message ptr in return_message.")); - EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); + // Return it. + EXPECT_NO_THROW(message_memory_strategy_->return_message(message)); + + // But we are still holding the reference, so we expect that there is still no room in the pool. + RCLCPP_EXPECT_THROW_EQ( + message_memory_strategy_->borrow_message(), + std::runtime_error("No more free slots in the pool")); + } + + // Now that we've dropped the reference (left the scope), we expect to be able to borrow again. + + auto message2 = message_memory_strategy_->borrow_message(); + ASSERT_NE(nullptr, message2); + + EXPECT_NO_THROW(message_memory_strategy_->return_message(message2)); }