Skip to content

Commit

Permalink
use serialized message in callback
Browse files Browse the repository at this point in the history
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
Karsten1987 committed Apr 21, 2020
1 parent 7dc5903 commit 5aad511
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 36 deletions.
31 changes: 6 additions & 25 deletions rclcpp/include/rclcpp/message_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class MessageMemoryStrategy
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;

using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
using SerializedMessageAllocTraits = allocator::AllocRebind<rclcpp::SerializedMessage, Alloc>;
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
using SerializedMessageDeleter =
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;
allocator::Deleter<SerializedMessageAlloc, rclcpp::SerializedMessage>;

using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
using BufferAlloc = typename BufferAllocTraits::allocator_type;
Expand Down Expand Up @@ -86,31 +86,12 @@ class MessageMemoryStrategy
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message(size_t capacity)
{
auto msg = new rcl_serialized_message_t;
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
msg,
[](rmw_serialized_message_t * msg) {
auto fini_ret = rmw_serialized_message_fini(msg);
delete msg;
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy serialized message: %s", rcl_get_error_string().str);
}
});

return serialized_msg;
return std::make_shared<rclcpp::SerializedMessage>(capacity);
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message()
{
return borrow_serialized_message(default_buffer_capacity_);
}
Expand All @@ -127,7 +108,7 @@ class MessageMemoryStrategy
msg.reset();
}

virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
virtual void return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & serialized_msg)
{
serialized_msg.reset();
}
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class Subscription : public SubscriptionBase
return message_memory_strategy_->borrow_message();
}

std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() override
{
return message_memory_strategy_->borrow_serialized_message();
Expand Down Expand Up @@ -284,7 +284,7 @@ class Subscription : public SubscriptionBase
}

void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
{
message_memory_strategy_->return_serialized_message(message);
}
Expand Down
7 changes: 4 additions & 3 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rclcpp/message_info.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -151,7 +152,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
*/
RCLCPP_PUBLIC
bool
take_serialized(rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out);
take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);

/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
Expand All @@ -164,7 +165,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
/** \return Shared pointer to a rcl_message_serialized_t. */
RCLCPP_PUBLIC
virtual
std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() = 0;

/// Check if we need to handle the message, and execute the callback if we do.
Expand Down Expand Up @@ -194,7 +195,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_PUBLIC
virtual
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;

RCLCPP_PUBLIC
const rosidl_message_type_support_t &
Expand Down
3 changes: 1 addition & 2 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
if (subscription->is_serialized()) {
// This is the case where a copy of the serialized message is taken from
// the middleware via inter-process communication.
std::shared_ptr<rcl_serialized_message_t> serialized_msg =
subscription->create_serialized_message();
auto serialized_msg = subscription->create_serialized_message();
take_and_do_error_handling(
"taking a serialized message from topic",
subscription->get_topic_name(),
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes

bool
SubscriptionBase::take_serialized(
rcl_serialized_message_t & message_out,
rclcpp::SerializedMessage & message_out,
rclcpp::MessageInfo & message_info_out)
{
rcl_ret_t ret = rcl_take_serialized_message(
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/test/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ TEST_F(TestSubscription, take) {
TEST_F(TestSubscription, take_serialized) {
initialize();
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const rcl_serialized_message_t>) {FAIL();};
auto do_nothing = [](std::shared_ptr<const rclcpp::SerializedMessage>) {FAIL();};
{
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
EXPECT_FALSE(sub->take_serialized(*msg, msg_info));
}
Expand All @@ -317,7 +317,7 @@ TEST_F(TestSubscription, take_serialized) {
test_msgs::msg::Empty msg;
pub->publish(msg);
}
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
bool message_recieved = false;
auto start = std::chrono::steady_clock::now();
Expand Down

0 comments on commit 5aad511

Please sign in to comment.