Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup intra-process-manager #1695

Merged
merged 1 commit into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"
#include "rclcpp/qos.hpp"

namespace rclcpp
{
Expand All @@ -37,13 +38,13 @@ template<
typename rclcpp::experimental::buffers::IntraProcessBuffer<MessageT, Alloc, Deleter>::UniquePtr
create_intra_process_buffer(
IntraProcessBufferType buffer_type,
rmw_qos_profile_t qos,
const rclcpp::QoS & qos,
std::shared_ptr<Alloc> allocator)
{
using MessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;

size_t buffer_size = qos.depth;
size_t buffer_size = qos.depth();

using rclcpp::experimental::buffers::IntraProcessBuffer;
typename IntraProcessBuffer<MessageT, Alloc, Deleter>::UniquePtr buffer;
Expand Down
31 changes: 7 additions & 24 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,36 +305,17 @@ class IntraProcessManager
get_subscription_intra_process(uint64_t intra_process_subscription_id);

private:
struct SubscriptionInfo
{
SubscriptionInfo() = default;

rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription;
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
};

struct PublisherInfo
{
PublisherInfo() = default;

rclcpp::PublisherBase::WeakPtr publisher;
rmw_qos_profile_t qos;
const char * topic_name;
};

struct SplittedSubscriptions
{
std::vector<uint64_t> take_shared_subscriptions;
std::vector<uint64_t> take_ownership_subscriptions;
};

using SubscriptionMap =
std::unordered_map<uint64_t, SubscriptionInfo>;
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

using PublisherMap =
std::unordered_map<uint64_t, PublisherInfo>;
std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;

using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;
Expand All @@ -350,7 +331,9 @@ class IntraProcessManager

RCLCPP_PUBLIC
bool
can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
can_communicate(
rclcpp::PublisherBase::SharedPtr pub,
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;

template<
typename MessageT,
Expand All @@ -366,7 +349,7 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription.lock();
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
Expand Down Expand Up @@ -404,7 +387,7 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription.lock();
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -72,7 +73,7 @@ class SubscriptionIntraProcess
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(
allocator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rcl/error_handling.h"

#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"

Expand All @@ -39,7 +40,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
SubscriptionIntraProcessBase(
const std::string & topic_name,
const rclcpp::QoS & qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
{}

Expand Down Expand Up @@ -71,7 +74,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
get_topic_name() const;

RCLCPP_PUBLIC
rmw_qos_profile_t
QoS
get_actual_qos() const;

protected:
Expand All @@ -83,7 +86,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
trigger_guard_condition() = 0;

std::string topic_name_;
rmw_qos_profile_t qos_profile_;
QoS qos_profile_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -64,7 +65,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile)
{
Expand Down
8 changes: 4 additions & 4 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,15 @@ class Publisher : public PublisherBase
// Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
// Register the publisher with the intra process manager.
if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
if (qos.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos.get_rmw_qos_profile().depth == 0) {
if (qos.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with a zero qos history depth value");
}
if (qos.get_rmw_qos_profile().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ class Subscription : public SubscriptionBase
using rclcpp::detail::resolve_intra_process_buffer_type;

// Check if the QoS is compatible with intra-process.
rmw_qos_profile_t qos_profile = get_actual_qos().get_rmw_qos_profile();
if (qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
auto qos_profile = get_actual_qos();
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos_profile.depth == 0) {
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with 0 depth qos policy");
}
if (qos_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}
Expand Down
60 changes: 28 additions & 32 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,50 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

auto id = IntraProcessManager::get_next_unique_id();
uint64_t pub_id = IntraProcessManager::get_next_unique_id();

publishers_[id].publisher = publisher;
publishers_[id].topic_name = publisher->get_topic_name();
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
publishers_[pub_id] = publisher;

// Initialize the subscriptions storage for this publisher.
pub_to_subs_[id] = SplittedSubscriptions();
pub_to_subs_[pub_id] = SplittedSubscriptions();

// create an entry for the publisher id and populate with already existing subscriptions
for (auto & pair : subscriptions_) {
if (can_communicate(publishers_[id], pair.second)) {
insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method);
auto subscription = pair.second.lock();
if (!subscription) {
continue;
}
if (can_communicate(publisher, subscription)) {
uint64_t sub_id = pair.first;
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
}
}

return id;
return pub_id;
}

uint64_t
IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

auto id = IntraProcessManager::get_next_unique_id();
uint64_t sub_id = IntraProcessManager::get_next_unique_id();

subscriptions_[id].subscription = subscription;
subscriptions_[id].topic_name = subscription->get_topic_name();
subscriptions_[id].qos = subscription->get_actual_qos();
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
subscriptions_[sub_id] = subscription;

// adds the subscription id to all the matchable publishers
for (auto & pair : publishers_) {
if (can_communicate(pair.second, subscriptions_[id])) {
insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method);
auto publisher = pair.second.lock();
if (!publisher) {
continue;
}
if (can_communicate(publisher, subscription)) {
uint64_t pub_id = pair.first;
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
}
}

return id;
return sub_id;
}

void
Expand Down Expand Up @@ -116,7 +121,7 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.publisher.lock();
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
Expand Down Expand Up @@ -157,7 +162,7 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc
if (subscription_it == subscriptions_.end()) {
return nullptr;
} else {
auto subscription = subscription_it->second.subscription.lock();
auto subscription = subscription_it->second.lock();
if (subscription) {
return subscription;
} else {
Expand Down Expand Up @@ -204,25 +209,16 @@ IntraProcessManager::insert_sub_id_for_pub(

bool
IntraProcessManager::can_communicate(
PublisherInfo pub_info,
SubscriptionInfo sub_info) const
rclcpp::PublisherBase::SharedPtr pub,
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const
{
// publisher and subscription must be on the same topic
if (strcmp(pub_info.topic_name, sub_info.topic_name) != 0) {
return false;
}

// TODO(alsora): the following checks for qos compatibility should be provided by the RMW
// a reliable subscription can't be connected with a best effort publisher
if (
sub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE &&
pub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT)
{
if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
return false;
}

// a publisher and a subscription with different durability can't communicate
if (sub_info.qos.durability != pub_info.qos.durability) {
auto check_result = rclcpp::qos_check_compatible(pub->get_actual_qos(), sub->get_actual_qos());
if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ SubscriptionIntraProcessBase::get_topic_name() const
return topic_name_.c_str();
}

rmw_qos_profile_t
rclcpp::QoS
SubscriptionIntraProcessBase::get_actual_qos() const
{
return qos_profile_;
Expand Down
Loading