Skip to content

Commit

Permalink
Merge pull request #50 from mauropasse/mauro/group-callback-data
Browse files Browse the repository at this point in the history
Rework executor callback data
  • Loading branch information
iRobot ROS committed Mar 11, 2021
2 parents 00666c1 + df37d31 commit 0d50b6c
Show file tree
Hide file tree
Showing 23 changed files with 283 additions and 129 deletions.
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ class ClientBase

RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const;

protected:
RCLCPP_DISABLE_COPY(ClientBase)
Expand Down
19 changes: 10 additions & 9 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@

#include "rclcpp/executor.hpp"
#include "rclcpp/executors/events_executor_entities_collector.hpp"
#include "rclcpp/executors/events_executor_event_types.hpp"
#include "rclcpp/executors/events_executor_notify_waitable.hpp"
#include "rclcpp/executors/timers_manager.hpp"
#include "rclcpp/experimental/buffers/events_queue.hpp"
#include "rclcpp/experimental/buffers/simple_events_queue.hpp"
#include "rclcpp/node.hpp"

#include "rmw/listener_event_types.h"
#include "rmw/listener_callback_type.h"

namespace rclcpp
{
Expand Down Expand Up @@ -214,20 +215,20 @@ class EventsExecutor : public rclcpp::Executor
// This function is called by the DDS entities when an event happened,
// like a subscription receiving a message.
static void
push_event(void * executor_ptr, rmw_listener_event_t event)
push_event(const void * event_data)
{
// Check if the executor pointer is not valid
if (!executor_ptr) {
throw std::runtime_error("The executor pointer is not valid.");
if (!event_data) {
throw std::runtime_error("Executor event data not valid.");
}

auto this_executor = static_cast<executors::EventsExecutor *>(executor_ptr);
auto data = static_cast<const executors::EventsExecutorCallbackData *>(event_data);

executors::EventsExecutor * this_executor = data->executor;

// Event queue mutex scope
{
std::unique_lock<std::mutex> lock(this_executor->push_mutex_);

this_executor->events_queue_->push(event);
this_executor->events_queue_->push(data->event);
}
// Notify that the event queue has some events in it.
this_executor->events_queue_cv_.notify_one();
Expand All @@ -236,7 +237,7 @@ class EventsExecutor : public rclcpp::Executor
// Execute a single event
RCLCPP_PUBLIC
void
execute_event(const rmw_listener_event_t & event);
execute_event(const ExecutorEvent & event);

// Queue where entities can push events
rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "rclcpp/executors/event_waitable.hpp"
#include "rclcpp/executors/events_executor_event_types.hpp"
#include "rclcpp/executors/timers_manager.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"

Expand Down Expand Up @@ -213,6 +214,12 @@ class EventsExecutorEntitiesCollector final
void
unset_guard_condition_callback(const rcl_guard_condition_t * guard_condition);

void
remove_callback_data(void * entity_id, ExecutorEventType type);

const EventsExecutorCallbackData *
get_callback_data(void * entity_id, ExecutorEventType type);

/// Return true if the node belongs to the collector
/**
* \param[in] group_ptr a node base interface shared pointer
Expand Down Expand Up @@ -262,6 +269,17 @@ class EventsExecutorEntitiesCollector final
EventsExecutor * associated_executor_ = nullptr;
/// Instance of the timers manager used by the associated executor
TimersManager::SharedPtr timers_manager_;

/// Callback data objects mapped to the number of listeners sharing the same object.
/// When no more listeners use the object, it can be removed from the map.
/// For example, the entities collector holds every node's guard condition, which
/// share the same EventsExecutorCallbackData object ptr to use as their callback arg:
/// cb_data_object = {executor_ptr, entities_collector_ptr, WAITABLE_EVENT};
/// Node1->gc(&cb_data_object)
/// Node2->gc(&cb_data_object)
/// So the maps has: (cb_data_object, 2)
/// When both nodes are removed (counter = 0), the cb_data_object can be destroyed.
std::unordered_map<EventsExecutorCallbackData, size_t, KeyHasher> callback_data_map_;
};

} // namespace executors
Expand Down
79 changes: 79 additions & 0 deletions rclcpp/include/rclcpp/executors/events_executor_event_types.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_

namespace rclcpp
{
namespace executors
{

// forward declaration of EventsExecutor to avoid circular dependency
class EventsExecutor;

enum ExecutorEventType
{
SUBSCRIPTION_EVENT,
SERVICE_EVENT,
CLIENT_EVENT,
WAITABLE_EVENT
};

struct ExecutorEvent
{
const void * entity_id;
ExecutorEventType type;
};

// The EventsExecutorCallbackData struct is what the listeners
// will use as argument when calling their callbacks from the
// RMW implementation. The listeners get a (void *) of this struct,
// and the executor is in charge to cast it back and use the data.
struct EventsExecutorCallbackData
{
EventsExecutorCallbackData(
EventsExecutor * _executor,
ExecutorEvent _event)
{
executor = _executor;
event = _event;
}

// Equal operator
bool operator==(const EventsExecutorCallbackData & other) const
{
return (event.entity_id == other.event.entity_id);
}

// Struct members
EventsExecutor * executor;
ExecutorEvent event;
};

// To be able to use std::unordered_map with an EventsExecutorCallbackData
// as key, we need a hasher. We use the entity ID as hash, since it is
// unique for each EventsExecutorCallbackData object.
struct KeyHasher
{
size_t operator()(const EventsExecutorCallbackData & key) const
{
return std::hash<const void *>()(key.event.entity_id);
}
};

} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,15 @@ class EventsExecutorNotifyWaitable final : public EventWaitable

RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const override
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const override
{
for (auto gc : notify_guard_conditions_) {
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
gc,
executor_callback,
executor,
this,
callback,
user_data,
false);

if (RCL_RET_OK != ret) {
Expand Down
12 changes: 6 additions & 6 deletions rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/listener_event_types.h"
#include "rclcpp/executors/events_executor_event_types.hpp"

namespace rclcpp
{
Expand All @@ -31,7 +31,7 @@ namespace buffers

/**
* @brief This abstract class can be used to implement different types of queues
* where `rmw_listener_event_t` can be stored.
* where `ExecutorEvent` can be stored.
* The derived classes should choose which underlying container to use and
* the strategy for pushing and popping events.
* For example a queue implementation may be bounded or unbounded and have
Expand All @@ -42,7 +42,7 @@ namespace buffers
class EventsQueue
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue)
RCLCPP_SMART_PTR_ALIASES_ONLY(EventsQueue)

/**
* @brief Destruct the object.
Expand All @@ -57,7 +57,7 @@ class EventsQueue
RCLCPP_PUBLIC
virtual
void
push(const rmw_listener_event_t & event) = 0;
push(const rclcpp::executors::ExecutorEvent & event) = 0;

/**
* @brief removes front element from the queue.
Expand All @@ -73,7 +73,7 @@ class EventsQueue
*/
RCLCPP_PUBLIC
virtual
rmw_listener_event_t
rclcpp::executors::ExecutorEvent
front() const = 0;

/**
Expand Down Expand Up @@ -108,7 +108,7 @@ class EventsQueue
*/
RCLCPP_PUBLIC
virtual
std::queue<rmw_listener_event_t>
std::queue<rclcpp::executors::ExecutorEvent>
pop_all_events() = 0;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SimpleEventsQueue : public EventsQueue
RCLCPP_PUBLIC
virtual
void
push(const rmw_listener_event_t & event)
push(const rclcpp::executors::ExecutorEvent & event)
{
event_queue_.push(event);
}
Expand All @@ -68,7 +68,7 @@ class SimpleEventsQueue : public EventsQueue
*/
RCLCPP_PUBLIC
virtual
rmw_listener_event_t
rclcpp::executors::ExecutorEvent
front() const
{
return event_queue_.front();
Expand Down Expand Up @@ -107,7 +107,7 @@ class SimpleEventsQueue : public EventsQueue
init()
{
// Make sure the queue is empty when we start
std::queue<rmw_listener_event_t> local_queue;
std::queue<rclcpp::executors::ExecutorEvent> local_queue;
std::swap(event_queue_, local_queue);
}

Expand All @@ -118,16 +118,16 @@ class SimpleEventsQueue : public EventsQueue
*/
RCLCPP_PUBLIC
virtual
std::queue<rmw_listener_event_t>
std::queue<rclcpp::executors::ExecutorEvent>
pop_all_events()
{
std::queue<rmw_listener_event_t> local_queue;
std::queue<rclcpp::executors::ExecutorEvent> local_queue;
std::swap(event_queue_, local_queue);
return local_queue;
}

private:
std::queue<rmw_listener_event_t> event_queue_;
std::queue<rclcpp::executors::ExecutorEvent> event_queue_;
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const override;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const override;

protected:
std::recursive_mutex reentrant_mutex_;
Expand Down
7 changes: 3 additions & 4 deletions rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,11 @@ class QOSEventHandlerBase : public Waitable
bool
is_ready(rcl_wait_set_t * wait_set) override;

/// Set EventsExecutor's callback
RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const override;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const override;

protected:
rcl_event_t event_handle_;
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class ServiceBase

RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const;

protected:
RCLCPP_DISABLE_COPY(ServiceBase)
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

RCLCPP_PUBLIC
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const;

protected:
template<typename EventCallbackT>
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ class Waitable
RCLCPP_PUBLIC
virtual
void
set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const;
set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const;

private:
std::atomic<bool> in_use_by_wait_set_{false};
Expand Down
13 changes: 6 additions & 7 deletions rclcpp/src/rclcpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,16 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state)
}

void
ClientBase::set_events_executor_callback(
rclcpp::executors::EventsExecutor * executor,
rmw_listener_callback_t executor_callback) const
ClientBase::set_listener_callback(
rmw_listener_callback_t callback,
const void * user_data) const
{
rcl_ret_t ret = rcl_client_set_listener_callback(
client_handle_.get(),
executor_callback,
executor,
this);
callback,
user_data);

if (RCL_RET_OK != ret) {
throw std::runtime_error("Couldn't set the EventsExecutor's callback to client");
throw std::runtime_error("Couldn't set listener callback to client");
}
}
Loading

0 comments on commit 0d50b6c

Please sign in to comment.