From 985a981c661df0514cfc1bab64a13f4ebe81baf4 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 23 Apr 2020 09:33:48 -0300 Subject: [PATCH] Revert "Use serialized message (#1081)" This reverts commit 46cfe84b14d7718eb9487f73ee13b31ca896ea98. --- .../rclcpp/message_memory_strategy.hpp | 32 +++++++++++++++---- rclcpp/include/rclcpp/publisher.hpp | 6 ---- rclcpp/include/rclcpp/serialized_message.hpp | 14 -------- rclcpp/include/rclcpp/subscription.hpp | 4 +-- rclcpp/include/rclcpp/subscription_base.hpp | 7 ++-- rclcpp/src/rclcpp/executor.cpp | 3 +- rclcpp/src/rclcpp/serialized_message.cpp | 16 ---------- rclcpp/src/rclcpp/subscription_base.cpp | 4 +-- rclcpp/test/test_serialized_message.cpp | 25 --------------- .../test_serialized_message_allocator.cpp | 18 +++++------ rclcpp/test/test_subscription.cpp | 6 ++-- 11 files changed, 46 insertions(+), 89 deletions(-) diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index 6e4c0be065..80654d79e7 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -46,10 +46,10 @@ class MessageMemoryStrategy using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; - using SerializedMessageAllocTraits = allocator::AllocRebind; + using SerializedMessageAllocTraits = allocator::AllocRebind; using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type; using SerializedMessageDeleter = - allocator::Deleter; + allocator::Deleter; using BufferAllocTraits = allocator::AllocRebind; using BufferAlloc = typename BufferAllocTraits::allocator_type; @@ -86,12 +86,31 @@ class MessageMemoryStrategy return std::allocate_shared(*message_allocator_.get()); } - virtual std::shared_ptr borrow_serialized_message(size_t capacity) + virtual std::shared_ptr borrow_serialized_message(size_t capacity) { - return std::make_shared(capacity); + auto msg = new rcl_serialized_message_t; + *msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + auto serialized_msg = std::shared_ptr( + msg, + [](rmw_serialized_message_t * msg) { + auto fini_ret = rmw_serialized_message_fini(msg); + delete msg; + if (fini_ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to destroy serialized message: %s", rcl_get_error_string().str); + } + }); + + return serialized_msg; } - virtual std::shared_ptr borrow_serialized_message() + virtual std::shared_ptr borrow_serialized_message() { return borrow_serialized_message(default_buffer_capacity_); } @@ -108,8 +127,7 @@ class MessageMemoryStrategy msg.reset(); } - virtual void return_serialized_message( - std::shared_ptr & serialized_msg) + virtual void return_serialized_message(std::shared_ptr & serialized_msg) { serialized_msg.reset(); } diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index abe259b31d..aa614e3a0c 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -217,12 +217,6 @@ class Publisher : public PublisherBase return this->do_serialized_publish(&serialized_msg); } - void - publish(const SerializedMessage & serialized_msg) - { - return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message()); - } - /// Publish an instance of a LoanedMessage. /** * When publishing a loaned message, the memory for this ROS message will be deallocated diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 4b90bf3561..13c2d1910b 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -98,20 +98,6 @@ class RCLCPP_PUBLIC_TYPE SerializedMessage */ size_t capacity() const; - /// Allocate memory in the data buffer - /** - * The data buffer of the underlying rcl_serialized_message_t will be resized. - * This might change the data layout and invalidates all pointers to the data. - */ - void reserve(size_t capacity); - - /// Release the underlying rcl_serialized_message_t - /** - * The memory (i.e. the data buffer) of the serialized message will no longer - * be managed by this instance and the memory won't be deallocated on destruction. - */ - rcl_serialized_message_t release_rcl_serialized_message(); - private: rcl_serialized_message_t serialized_message_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 91343aaee0..a1e544c773 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -250,7 +250,7 @@ class Subscription : public SubscriptionBase return message_memory_strategy_->borrow_message(); } - std::shared_ptr + std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); @@ -299,7 +299,7 @@ class Subscription : public SubscriptionBase } void - return_serialized_message(std::shared_ptr & message) override + return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 8156622299..7ee8c624d1 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -33,7 +33,6 @@ #include "rclcpp/message_info.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" -#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -152,7 +151,7 @@ class SubscriptionBase : public std::enable_shared_from_this */ RCLCPP_PUBLIC bool - take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out); + take_serialized(rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out); /// Borrow a new message. /** \return Shared pointer to the fresh message. */ @@ -165,7 +164,7 @@ class SubscriptionBase : public std::enable_shared_from_this /** \return Shared pointer to a rcl_message_serialized_t. */ RCLCPP_PUBLIC virtual - std::shared_ptr + std::shared_ptr create_serialized_message() = 0; /// Check if we need to handle the message, and execute the callback if we do. @@ -195,7 +194,7 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC virtual void - return_serialized_message(std::shared_ptr & message) = 0; + return_serialized_message(std::shared_ptr & message) = 0; RCLCPP_PUBLIC const rosidl_message_type_support_t & diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index c25b47db49..556c42ddde 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -350,7 +350,8 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) if (subscription->is_serialized()) { // This is the case where a copy of the serialized message is taken from // the middleware via inter-process communication. - std::shared_ptr serialized_msg = subscription->create_serialized_message(); + std::shared_ptr serialized_msg = + subscription->create_serialized_message(); take_and_do_error_handling( "taking a serialized message from topic", subscription->get_topic_name(), diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index 64e54f58f9..a667977cea 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -144,20 +144,4 @@ size_t SerializedMessage::capacity() const { return serialized_message_.buffer_capacity; } - -void SerializedMessage::reserve(size_t capacity) -{ - auto ret = rmw_serialized_message_resize(&serialized_message_, capacity); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } -} - -rcl_serialized_message_t SerializedMessage::release_rcl_serialized_message() -{ - auto ret = serialized_message_; - serialized_message_ = rmw_get_zero_initialized_serialized_message(); - - return ret; -} } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index de1acfadb2..2af068e32e 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -159,12 +159,12 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes bool SubscriptionBase::take_serialized( - rclcpp::SerializedMessage & message_out, + rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out) { rcl_ret_t ret = rcl_take_serialized_message( this->get_subscription_handle().get(), - &message_out.get_rcl_serialized_message(), + &message_out, &message_info_out.get_rmw_message_info(), nullptr); if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) { diff --git a/rclcpp/test/test_serialized_message.cpp b/rclcpp/test/test_serialized_message.cpp index 9047e22b22..0c8a037490 100644 --- a/rclcpp/test/test_serialized_message.cpp +++ b/rclcpp/test/test_serialized_message.cpp @@ -114,31 +114,6 @@ TEST(TestSerializedMessage, various_constructors_from_rcl) { EXPECT_TRUE(nullptr != rcl_handle.buffer); } -TEST(TestSerializedMessage, release) { - const std::string content = "Hello World"; - const auto content_size = content.size() + 1; // accounting for null terminator - - rcl_serialized_message_t released_handle = rmw_get_zero_initialized_serialized_message(); - { - rclcpp::SerializedMessage serialized_msg(13); - // manually copy some content - auto & rcl_serialized_msg = serialized_msg.get_rcl_serialized_message(); - std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size()); - rcl_serialized_msg.buffer[content.size()] = '\0'; - rcl_serialized_msg.buffer_length = content_size; - EXPECT_EQ(13u, serialized_msg.capacity()); - - released_handle = serialized_msg.release_rcl_serialized_message(); - // scope exit of serialized_msg - } - - EXPECT_TRUE(nullptr != released_handle.buffer); - EXPECT_EQ(13u, released_handle.buffer_capacity); - EXPECT_EQ(content_size, released_handle.buffer_length); - // cleanup memory manually - EXPECT_EQ(RCL_RET_OK, rmw_serialized_message_fini(&released_handle)); -} - TEST(TestSerializedMessage, serialization) { using MessageT = test_msgs::msg::BasicTypes; diff --git a/rclcpp/test/test_serialized_message_allocator.cpp b/rclcpp/test/test_serialized_message_allocator.cpp index 968ad0ac2d..bc177722e4 100644 --- a/rclcpp/test/test_serialized_message_allocator.cpp +++ b/rclcpp/test/test_serialized_message_allocator.cpp @@ -29,25 +29,25 @@ TEST(TestSerializedMessageAllocator, default_allocator) { rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); auto msg0 = mem_strategy->borrow_serialized_message(); - ASSERT_EQ(msg0->capacity(), 0u); + ASSERT_EQ(msg0->buffer_capacity, 0u); mem_strategy->return_serialized_message(msg0); auto msg100 = mem_strategy->borrow_serialized_message(100); - ASSERT_EQ(msg100->capacity(), 100u); + ASSERT_EQ(msg100->buffer_capacity, 100u); mem_strategy->return_serialized_message(msg100); auto msg200 = mem_strategy->borrow_serialized_message(); - auto ret = rmw_serialized_message_resize(&msg200->get_rcl_serialized_message(), 200); + auto ret = rmw_serialized_message_resize(msg200.get(), 200); ASSERT_EQ(RCL_RET_OK, ret); - EXPECT_EQ(0u, msg200->size()); - EXPECT_EQ(200u, msg200->capacity()); + EXPECT_EQ(0u, msg200->buffer_length); + EXPECT_EQ(200u, msg200->buffer_capacity); mem_strategy->return_serialized_message(msg200); auto msg1000 = mem_strategy->borrow_serialized_message(1000); - ASSERT_EQ(msg1000->capacity(), 1000u); - ret = rmw_serialized_message_resize(&msg1000->get_rcl_serialized_message(), 2000); + ASSERT_EQ(msg1000->buffer_capacity, 1000u); + ret = rmw_serialized_message_resize(msg1000.get(), 2000); ASSERT_EQ(RCL_RET_OK, ret); - EXPECT_EQ(2000u, msg1000->capacity()); + EXPECT_EQ(2000u, msg1000->buffer_capacity); mem_strategy->return_serialized_message(msg1000); } @@ -61,7 +61,7 @@ TEST(TestSerializedMessageAllocator, borrow_from_subscription) { [](std::shared_ptr test_msg) {(void) test_msg;}); auto msg0 = sub->create_serialized_message(); - EXPECT_EQ(0u, msg0->capacity()); + EXPECT_EQ(0u, msg0->buffer_capacity); sub->return_serialized_message(msg0); rclcpp::shutdown(); diff --git a/rclcpp/test/test_subscription.cpp b/rclcpp/test/test_subscription.cpp index a265f05e17..95f9b96448 100644 --- a/rclcpp/test/test_subscription.cpp +++ b/rclcpp/test/test_subscription.cpp @@ -299,10 +299,10 @@ TEST_F(TestSubscription, take) { TEST_F(TestSubscription, take_serialized) { initialize(); using test_msgs::msg::Empty; - auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto do_nothing = [](std::shared_ptr) {FAIL();}; { auto sub = node->create_subscription("~/test_take", 1, do_nothing); - std::shared_ptr msg = sub->create_serialized_message(); + std::shared_ptr msg = sub->create_serialized_message(); rclcpp::MessageInfo msg_info; EXPECT_FALSE(sub->take_serialized(*msg, msg_info)); } @@ -317,7 +317,7 @@ TEST_F(TestSubscription, take_serialized) { test_msgs::msg::Empty msg; pub->publish(msg); } - std::shared_ptr msg = sub->create_serialized_message(); + std::shared_ptr msg = sub->create_serialized_message(); rclcpp::MessageInfo msg_info; bool message_recieved = false; auto start = std::chrono::steady_clock::now();