Skip to content

Commit

Permalink
Avoid Resource deadlock avoided if use intra_process_comms (#1530)
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
Chen Lihui and ivanpauno committed Feb 3, 2021
1 parent 9bf0f83 commit 7c984f1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
54 changes: 30 additions & 24 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class IntraProcessManager
{
SubscriptionInfo() = default;

rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription;
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
Expand Down Expand Up @@ -361,13 +361,16 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription;
auto subscription_base = subscription_it->second.subscription.lock();
if (subscription_base) {
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

subscription->provide_intra_process_message(message);
subscription->provide_intra_process_message(message);
} else {
subscriptions_.erase(id);
}
}
}

Expand All @@ -389,24 +392,27 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription;

auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_message(std::move(message));
auto subscription_base = subscription_it->second.subscription.lock();
if (subscription_base) {
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_message(std::move(message));
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

subscription->provide_intra_process_message(std::move(copy_message));
}
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

subscription->provide_intra_process_message(std::move(copy_message));
subscriptions_.erase(subscription_it);
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,7 @@ class Subscription : public SubscriptionBase

// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto context = node_base->get_context();
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type>;
auto subscription_intra_process = std::make_shared<SubscriptionIntraProcessT>(
subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
callback,
options.get_allocator(),
context,
Expand All @@ -185,12 +181,12 @@ class Subscription : public SubscriptionBase
TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
static_cast<const void *>(subscription_intra_process.get()));
static_cast<const void *>(subscription_intra_process_.get()));

// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
auto ipm = context->get_sub_context<IntraProcessManager>();
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_);
this->setup_intra_process(intra_process_subscription_id, ipm);
}

Expand Down Expand Up @@ -347,6 +343,11 @@ class Subscription : public SubscriptionBase
message_memory_strategy_;
/// Component which computes and publishes topic statistics for this subscriber
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
CallbackMessageT,
AllocatorT,
typename MessageUniquePtr::deleter_type>;
std::shared_ptr<SubscriptionIntraProcessT> subscription_intra_process_;
};

} // namespace rclcpp
Expand Down
8 changes: 7 additions & 1 deletion rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,13 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc
if (subscription_it == subscriptions_.end()) {
return nullptr;
} else {
return subscription_it->second.subscription;
auto subscription = subscription_it->second.subscription.lock();
if (subscription) {
return subscription;
} else {
subscriptions_.erase(subscription_it);
return nullptr;
}
}
}

Expand Down

0 comments on commit 7c984f1

Please sign in to comment.