Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rclcpp::guard_condition #1612

Merged
merged 3 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_
#define RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_

#include "rclcpp/guard_condition.hpp"

namespace rclcpp
{
namespace detail
{

/// Adds the guard condition to a waitset
/**
* \param[in] wait_set reference to a wait set where to add the guard condition
* \param[in] guard_condition reference to the guard_condition to be added
*/
RCLCPP_PUBLIC
void
add_guard_condition_to_rcl_wait_set(
rcl_wait_set_t & wait_set,
const rclcpp::GuardCondition & guard_condition);

} // namespace detail
} // namespace rclcpp

#endif // RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ class Executor
std::atomic_bool spinning;

/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition interrupt_guard_condition_;

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

Expand All @@ -549,7 +549,7 @@ class Executor
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
const rclcpp::GuardCondition *,
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class StaticExecutorEntitiesCollector final
* \throws std::runtime_error if it couldn't add guard condition to wait set
*/
RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

RCLCPP_PUBLIC
Expand Down Expand Up @@ -328,7 +328,7 @@ class StaticExecutorEntitiesCollector final
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rcl/error_handling.h"

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -41,9 +42,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
: gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
{}

virtual ~SubscriptionIntraProcessBase() = default;
Expand All @@ -53,7 +55,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
get_number_of_ready_guard_conditions() {return 1;}

RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set);

virtual bool
Expand All @@ -79,7 +81,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
rclcpp::GuardCondition gc_;

private:
virtual void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,13 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile)
: SubscriptionIntraProcessBase(context, topic_name, qos_profile)
{
// Create the intra-process buffer.
buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
buffer_type,
qos_profile,
allocator);

// Create the guard condition.
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_, context->get_rcl_context().get(), guard_condition_options);

if (RCL_RET_OK != ret) {
throw std::runtime_error(
"SubscriptionIntraProcessBuffer init error initializing guard condition");
}
}

virtual ~SubscriptionIntraProcessBuffer()
{
if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy guard condition: %s",
rcutils_get_error_string().str);
}
}

bool
Expand Down Expand Up @@ -130,8 +107,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
void
trigger_guard_condition()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
gc_.trigger();
}

BufferUniquePtr buffer_;
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -187,7 +188,7 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
mutable std::mutex node_graph_interfaces_mutex_;
std::vector<rclcpp::node_interfaces::NodeGraphInterface *> node_graph_interfaces_;

rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition interrupt_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};

Expand Down
9 changes: 8 additions & 1 deletion rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class GuardCondition
RCLCPP_PUBLIC
explicit GuardCondition(
rclcpp::Context::SharedPtr context =
rclcpp::contexts::get_global_default_context());
rclcpp::contexts::get_global_default_context(),
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options());

RCLCPP_PUBLIC
virtual
Expand All @@ -58,6 +60,11 @@ class GuardCondition
rclcpp::Context::SharedPtr
get_context() const;

/// Return the underlying rcl guard condition structure.
RCLCPP_PUBLIC
rcl_guard_condition_t &
get_rcl_guard_condition();

/// Return the underlying rcl guard condition structure.
RCLCPP_PUBLIC
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
const rcl_guard_condition_t &
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;

virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
add_guard_condition(const rclcpp::GuardCondition & guard_condition) = 0;

virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
remove_guard_condition(const rclcpp::GuardCondition * guard_condition) = 0;

virtual void
get_next_subscription(
Expand Down
8 changes: 2 additions & 6 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,9 @@ class NodeBase : public NodeBaseInterface
get_associated_with_executor_atomic() override;

RCLCPP_PUBLIC
rcl_guard_condition_t *
rclcpp::GuardCondition &
get_notify_guard_condition() override;
wjwwood marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
std::unique_lock<std::recursive_mutex>
acquire_notify_guard_condition_lock() const override;

RCLCPP_PUBLIC
bool
get_use_intra_process_default() const override;
Expand Down Expand Up @@ -149,7 +145,7 @@ class NodeBase : public NodeBaseInterface

/// Guard condition for notifying the Executor of changes to this node.
mutable std::recursive_mutex notify_guard_condition_mutex_;
rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition notify_guard_condition_;
bool notify_guard_condition_is_valid_;
};

Expand Down
14 changes: 4 additions & 10 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -142,24 +143,17 @@ class NodeBaseInterface
std::atomic_bool &
get_associated_with_executor_atomic() = 0;

/// Return guard condition that should be notified when the internal node state changes.
/// Return a guard condition that should be notified when the internal node state changes.
/**
* For example, this should be notified when a publisher is added or removed.
*
* \return the rcl_guard_condition_t if it is valid, else nullptr
* \return the GuardCondition if it is valid, else thow runtime error
*/
RCLCPP_PUBLIC
virtual
rcl_guard_condition_t *
rclcpp::GuardCondition &
get_notify_guard_condition() = 0;

/// Acquire and return a scoped lock that protects the notify guard condition.
/** This should be used when triggering the notify guard condition. */
RCLCPP_PUBLIC
virtual
std::unique_lock<std::recursive_mutex>
acquire_notify_guard_condition_lock() const = 0;

/// Return the default preference for using intra process communication.
RCLCPP_PUBLIC
virtual
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class QOSEventHandlerBase : public Waitable

/// Add the Waitable to a wait set.
RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

/// Check if the Waitable is ready.
Expand Down
26 changes: 8 additions & 18 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rcl/allocator.h"

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -61,17 +62,17 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
allocator_ = std::make_shared<VoidAlloc>();
}

void add_guard_condition(const rcl_guard_condition_t * guard_condition) override
void add_guard_condition(const rclcpp::GuardCondition & guard_condition) override
{
for (const auto & existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) {
if (existing_guard_condition == &guard_condition) {
return;
}
}
guard_conditions_.push_back(guard_condition);
guard_conditions_.push_back(&guard_condition);
}

void remove_guard_condition(const rcl_guard_condition_t * guard_condition) override
void remove_guard_condition(const rclcpp::GuardCondition * guard_condition) override
{
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) {
Expand Down Expand Up @@ -240,22 +241,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

for (auto guard_condition : guard_conditions_) {
if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't add guard_condition to wait set: %s",
rcl_get_error_string().str);
return false;
}
detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition);
}

for (auto waitable : waitable_handles_) {
if (!waitable->add_to_wait_set(wait_set)) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't add waitable to wait set: %s", rcl_get_error_string().str);
return false;
}
waitable->add_to_wait_set(wait_set);
}
return true;
}
Expand Down Expand Up @@ -509,7 +499,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
using VectorRebind =
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;

VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;

VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,7 @@ class StoragePolicyCommon
continue;
}
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
bool successful = waitable.add_to_wait_set(&rcl_wait_set_);
if (!successful) {
throw std::runtime_error("waitable unexpectedly failed to be added to wait set");
}
waitable.add_to_wait_set(&rcl_wait_set_);
}
}

Expand Down
Loading