Skip to content

Commit

Permalink
map parameters to list of callbacks
Browse files Browse the repository at this point in the history
functions to remove parameter callbacks

add functions to remove event callbacks, remove subscriptions, allow subscribing event callback to many namespaces, additional test coverage

Signed-off-by: bpwilcox <bpwilcox@eng.ucsd.edu>
  • Loading branch information
bpwilcox committed Sep 26, 2019
1 parent fa465b4 commit a91f464
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 33 deletions.
100 changes: 93 additions & 7 deletions rclcpp/include/rclcpp/parameter_events_subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
namespace rclcpp
{

struct ParameterEventsCallbackHandle
{
RCLCPP_SMART_PTR_DEFINITIONS(ParameterEventsCallbackHandle)

using ParameterEventsCallbackType = std::function<void(const rclcpp::Parameter &)>;

std::string parameter_name;
std::string node_name;
ParameterEventsCallbackType callback;
};

class ParameterEventsSubscriber
{
public:
Expand Down Expand Up @@ -56,13 +67,25 @@ class ParameterEventsSubscriber
* provided callback will be applied to all of them.
*
* \param[in] callback Function callback to be evaluated on event.
* \param[in] node_namespace Name of namespace for which a subscription will be created.
* \param[in] node_namespaces Vector of namespaces for which a subscription will be created.
*/
RCLCPP_PUBLIC
void
set_event_callback(
std::function<void(const rcl_interfaces::msg::ParameterEvent::SharedPtr &)> callback,
const std::string & node_namespace = "");
const std::vector<std::string> & node_namespaces = {""});

/// Remove parameter event callback.
/**
* Calling this function will set the event callback to nullptr. This function will also remove
* event subscriptions on the namespaces for which there are no other callbacks (from parameter
* callbacks) active on that namespace.
*/
RCLCPP_PUBLIC
void
remove_event_callback();

using ParameterEventsCallbackType = ParameterEventsCallbackHandle::ParameterEventsCallbackType;

/// Add a custom callback for a specified parameter.
/**
Expand All @@ -73,12 +96,50 @@ class ParameterEventsSubscriber
* \param[in] node_name Name of node which hosts the parameter.
*/
RCLCPP_PUBLIC
ParameterEventsCallbackHandle::SharedPtr
add_parameter_callback(
const std::string & parameter_name,
ParameterEventsCallbackType callback,
const std::string & node_name = "");

/// Remove a custom callback for a specified parameter given its callback handle.
/**
* The parameter name and node name are inspected from the callback handle. The callback handle
* is erased from the list of callback handles on the {parameter_name, node_name} in the map.
* An error is thrown if the handle does not exist and/or was already removed.
*
* \param[in] handle Pointer to callback handle to be removed.
*/
RCLCPP_PUBLIC
void
register_parameter_callback(
remove_parameter_callback(
const ParameterEventsCallbackHandle * const handle);

/// Remove a custom callback for a specified parameter given its name and respective node.
/**
* If a node_name is not provided, defaults to the current node.
* The {parameter_name, node_name} key on the parameter_callbacks_ map will be erased, removing
* all callback associated with that parameter.
*
* \param[in] parameter_name Name of parameter.
* \param[in] node_name Name of node which hosts the parameter.
*/
RCLCPP_PUBLIC
void
remove_parameter_callback(
const std::string & parameter_name,
std::function<void(const rclcpp::Parameter &)> callback,
const std::string & node_name = "");

/// Get a rclcpp::Parameter from parameter event, return true if parameter name & node in event.
/**
* If a node_name is not provided, defaults to the current node.
*
* \param[in] event Event msg to be inspected.
* \param[out] parameter Reference to rclcpp::Parameter to be assigned.
* \param[in] parameter_name Name of parameter.
* \param[in] node_name Name of node which hosts the parameter.
* \returns true if requested parameter name & node is in event, false otherwise
*/
RCLCPP_PUBLIC
static bool
get_parameter_from_event(
Expand All @@ -87,18 +148,41 @@ class ParameterEventsSubscriber
const std::string parameter_name,
const std::string node_name = "");

/// Get a rclcpp::Parameter from parameter event
/**
* If a node_name is not provided, defaults to the current node.
*
* The user is responsible to check if the returned parameter has been properly assigned.
* By default, if the requested parameter is not found in the event, the returned parameter
* has parameter value of type rclcpp::PARAMETER_NOT_SET.
*
* \param[in] event Event msg to be inspected.
* \param[in] parameter_name Name of parameter.
* \param[in] node_name Name of node which hosts the parameter.
* \returns The resultant rclcpp::Parameter from the event
*/
RCLCPP_PUBLIC
static rclcpp::Parameter
get_parameter_from_event(
const rcl_interfaces::msg::ParameterEvent::SharedPtr event,
const std::string parameter_name,
const std::string node_name = "");

using CallbacksContainerType = std::list<ParameterEventsCallbackHandle::WeakPtr>;

protected:
/// Add a subscription (if unique) to a namespace parameter events topic.
void
add_namespace_event_subscriber(const std::string & node_namespace);

/// Remove a subscription to a namespace parameter events topic.
void
remove_namespace_event_subscriber(const std::string & node_namespace);

/// Return true if any callbacks still exist on a namespace event topic, otherwise return false
bool
should_unsubscribe_to_namespace(const std::string & node_namespace);

/// Callback for parameter events subscriptions.
void
event_callback(const rcl_interfaces::msg::ParameterEvent::SharedPtr event);
Expand Down Expand Up @@ -140,18 +224,20 @@ class ParameterEventsSubscriber
// Map container for registered parameters.
std::unordered_map<
std::pair<std::string, std::string>,
std::function<void(const rclcpp::Parameter &)>,
CallbacksContainerType,
StringPairHash
> parameter_callbacks_;

// Vector of unique namespaces added.
std::vector<std::string> node_namespaces_;
std::vector<std::string> subscribed_namespaces_;
// Vector of event callback namespaces
std::vector<std::string> event_namespaces_;

// Vector of event subscriptions for each namespace.
std::vector<rclcpp::Subscription
<rcl_interfaces::msg::ParameterEvent>::SharedPtr> event_subscriptions_;

std::function<void(const rcl_interfaces::msg::ParameterEvent::SharedPtr &)> user_callback_;
std::function<void(const rcl_interfaces::msg::ParameterEvent::SharedPtr &)> event_callback_;

std::recursive_mutex mutex_;
};
Expand Down
162 changes: 146 additions & 16 deletions rclcpp/src/rclcpp/parameter_events_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rclcpp/parameter_events_subscriber.hpp"

Expand All @@ -37,10 +39,10 @@ void
ParameterEventsSubscriber::add_namespace_event_subscriber(const std::string & node_namespace)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (std::find(node_namespaces_.begin(), node_namespaces_.end(),
node_namespace) == node_namespaces_.end())
if (std::find(subscribed_namespaces_.begin(), subscribed_namespaces_.end(),
node_namespace) == subscribed_namespaces_.end())
{
node_namespaces_.push_back(node_namespace);
subscribed_namespaces_.push_back(node_namespace);
auto topic = join_path(node_namespace, "parameter_events");
RCLCPP_DEBUG(node_logging_->get_logger(), "Subscribing to topic: %s", topic.c_str());

Expand All @@ -52,32 +54,153 @@ ParameterEventsSubscriber::add_namespace_event_subscriber(const std::string & no
}
}

void
ParameterEventsSubscriber::remove_namespace_event_subscriber(const std::string & node_namespace)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto check_sub_cb = [this, &node_namespace](
rclcpp::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr sub) {
return sub->get_topic_name() == join_path(node_namespace, "parameter_events");
};

auto it = std::find_if(
event_subscriptions_.begin(),
event_subscriptions_.end(),
check_sub_cb);

if (it != event_subscriptions_.end()) {
event_subscriptions_.erase(it);
subscribed_namespaces_.erase(
std::remove(subscribed_namespaces_.begin(), subscribed_namespaces_.end(), node_namespace));
}
}

void
ParameterEventsSubscriber::set_event_callback(
std::function<void(const rcl_interfaces::msg::ParameterEvent::SharedPtr &)> callback,
const std::string & node_namespace)
const std::vector<std::string> & node_namespaces)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
remove_event_callback();
std::string full_namespace;
if (node_namespace == "") {
full_namespace = node_base_->get_namespace();
} else {
full_namespace = resolve_path(full_namespace);
for (auto & ns : node_namespaces) {
if (ns == "") {
full_namespace = node_base_->get_namespace();
} else {
full_namespace = resolve_path(ns);
}
add_namespace_event_subscriber(full_namespace);
event_namespaces_.push_back(full_namespace);
}
add_namespace_event_subscriber(full_namespace);
user_callback_ = callback;

event_callback_ = callback;
}

void
ParameterEventsSubscriber::register_parameter_callback(
ParameterEventsSubscriber::remove_event_callback()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
// Clear current vector of event namespaces and remove subscriptions
auto temp_namespaces = event_namespaces_;
event_namespaces_.clear();
for (auto temp_ns : temp_namespaces) {
if (should_unsubscribe_to_namespace(temp_ns)) {
remove_namespace_event_subscriber(temp_ns);
}
}

event_callback_ = nullptr;
}

ParameterEventsCallbackHandle::SharedPtr
ParameterEventsSubscriber::add_parameter_callback(
const std::string & parameter_name,
std::function<void(const rclcpp::Parameter &)> callback,
ParameterEventsCallbackType callback,
const std::string & node_name)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto full_node_name = resolve_path(node_name);
add_namespace_event_subscriber(split_path(full_node_name).first);
parameter_callbacks_[{parameter_name, full_node_name}] = callback;

auto handle = std::make_shared<ParameterEventsCallbackHandle>();
handle->callback = callback;
handle->parameter_name = parameter_name;
handle->node_name = full_node_name;
// the last callback registered is executed first.
parameter_callbacks_[{parameter_name, full_node_name}].emplace_front(handle);

return handle;
}

struct HandleCompare
: public std::unary_function<ParameterEventsCallbackHandle::WeakPtr, bool>
{
explicit HandleCompare(const ParameterEventsCallbackHandle * const base)
: base_(base) {}
bool operator()(const ParameterEventsCallbackHandle::WeakPtr & handle)
{
auto shared_handle = handle.lock();
if (base_ == shared_handle.get()) {
return true;
}
return false;
}
const ParameterEventsCallbackHandle * const base_;
};

void
ParameterEventsSubscriber::remove_parameter_callback(
const ParameterEventsCallbackHandle * const handle)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto & container = parameter_callbacks_[{handle->parameter_name, handle->node_name}];
auto it = std::find_if(
container.begin(),
container.end(),
HandleCompare(handle));
if (it != container.end()) {
container.erase(it);
if (container.empty()) {
parameter_callbacks_.erase({handle->parameter_name, handle->node_name});
if (should_unsubscribe_to_namespace(split_path(handle->node_name).first)) {
remove_namespace_event_subscriber(split_path(handle->node_name).first);
}
}
} else {
throw std::runtime_error("Callback doesn't exist");
}
}

bool
ParameterEventsSubscriber::should_unsubscribe_to_namespace(const std::string & node_namespace)
{
auto it1 = std::find(event_namespaces_.begin(), event_namespaces_.end(), node_namespace);
if (it1 != event_namespaces_.end()) {
return false;
}

auto it2 = parameter_callbacks_.begin();
while (it2 != parameter_callbacks_.end()) {
if (split_path(it2->first.second).first == node_namespace) {
return false;
}
++it2;
}
return true;
}

void
ParameterEventsSubscriber::remove_parameter_callback(
const std::string & parameter_name,
const std::string & node_name)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto full_node_name = resolve_path(node_name);
parameter_callbacks_.erase({parameter_name, full_node_name});
if (should_unsubscribe_to_namespace(split_path(full_node_name).first)) {
RCLCPP_INFO(node_logging_->get_logger(), "Removing namespace event subscription");
remove_namespace_event_subscriber(split_path(full_node_name).first);
}
}

bool
Expand Down Expand Up @@ -124,12 +247,19 @@ ParameterEventsSubscriber::event_callback(
for (auto it = parameter_callbacks_.begin(); it != parameter_callbacks_.end(); ++it) {
rclcpp::Parameter p;
if (get_parameter_from_event(event, p, it->first.first, it->first.second)) {
it->second(p);
for (auto cb = it->second.begin(); cb != it->second.end(); ++cb) {
auto shared_handle = cb->lock();
if (nullptr != shared_handle) {
shared_handle->callback(p);
} else {
cb = it->second.erase(cb);
}
}
}
}

if (user_callback_) {
user_callback_(event);
if (event_callback_) {
event_callback_(event);
}
}

Expand Down
Loading

0 comments on commit a91f464

Please sign in to comment.