Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RMW listener APIs #1579

Merged
merged 39 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
28aadf5
Add RMW listener APIs
Mar 11, 2021
9d9d753
split long log into two lines
Mar 16, 2021
e4ae5f5
Remove use_previous_event
Mar 24, 2021
c1d4a84
Do not set gc listener callback
Apr 1, 2021
28dbd07
significant refactor to make callbacks type safe
wjwwood Apr 2, 2021
780d053
Add on_ready callback to waitables, add clear_* functions, add unit-t…
alsora Apr 19, 2021
e909eed
add mutex to set and clear listener APIs
Apr 23, 2021
f4bb83a
allow to set ipc sub callback from regular subscription
Apr 23, 2021
c653699
fix minor failures
alsora Apr 23, 2021
a2b0186
fix typos and errors in comments
alsora Apr 23, 2021
bd2c5e4
fix comments
alsora Apr 23, 2021
8574237
expose qos listener APIs from pub and sub; add unit-tests
alsora Apr 23, 2021
2bd63c0
add qos event unit-test for invalid callbacks
alsora Apr 23, 2021
c1ccb94
Use QoS depth to limit callbacks count
Apr 28, 2021
e778449
fix ipc-subscription
alsora Jun 2, 2021
1bdc01d
Rename CallbackMessageT -> ROSMessageType
Jul 14, 2021
2d51d08
Set callbacks to Actions
Jul 14, 2021
8da3449
changes from upstream
wjwwood Jan 26, 2022
114c4d9
Merge remote-tracking branch 'origin/master' into irobot/add-rmw-list…
wjwwood Jan 26, 2022
1804a60
Unset callback on entities destruction
Feb 2, 2022
69d2ae9
Merge pull request #81 from mauropasse/mauro/add-rmw-listener-apis
alsora Feb 8, 2022
8185a23
Merge remote-tracking branch 'upstream/master' into irobot/add-rmw-li…
Feb 9, 2022
6d4748e
fix rebase errors
alsora Feb 9, 2022
82a1b46
fix unit-tests
alsora Feb 9, 2022
5af4cb2
Add GuardCondition on trigger callback
Feb 9, 2022
9d0ac6c
Add tests for new GuardCondition APIs
Feb 9, 2022
8b4b7f2
Merge pull request #83 from mauropasse/mauro/add-rclcpp-guard-cond-ca…
alsora Feb 9, 2022
acae886
Fix windows CI
Feb 10, 2022
49b0669
Merge pull request #84 from mauropasse/mauro/add-rmw-listener-apis-fix
alsora Feb 10, 2022
540e6bb
Action unset callbacks only if were set
Feb 10, 2022
4a1dc68
Merge pull request #85 from mauropasse/mauro/add-rmw-listener-apis-fix
alsora Feb 14, 2022
e5937db
add missing include rcl event callback include directive
alsora Feb 14, 2022
4ccf875
Merge remote-tracking branch 'upstream/master' into irobot/add-rmw-li…
Feb 16, 2022
df55696
typos
wjwwood Feb 23, 2022
0b0659e
remove listener reference
wjwwood Feb 23, 2022
89b8cb3
remove references to listener and move a method closer to impl
wjwwood Feb 23, 2022
d930902
cpplint
wjwwood Feb 23, 2022
f51a1be
Fix qos history check in subscription_intra_process.hpp
Feb 23, 2022
ec54f15
Merge remote-tracking branch 'upstream/master' into irobot/add-rmw-li…
Feb 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <future>
#include <unordered_map>
#include <memory>
#include <mutex>
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
#include <sstream>
#include <string>
Expand All @@ -29,20 +30,22 @@

#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rcl/event_callback.h"
#include "rcl/wait.h"

#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"

namespace rclcpp
Expand Down Expand Up @@ -215,6 +218,90 @@ class ClientBase
bool
exchange_in_use_by_wait_set_state(bool in_use_state);

/// Set a callback to be called when each new response is received.
/**
* The callback receives a size_t which is the number of responses received
* since the last time this callback was called.
* Normally this is 1, but can be > 1 if responses were received before any
* callback was set.
*
* Since this callback is called from the middleware, you should aim to make
* it fast and not blocking.
* If you need to do a lot of work or wait for some other event, you should
* spin it off to another thread, otherwise you risk blocking the middleware.
*
* Calling it again will clear any previously set callback.
*
* An exception will be thrown if the callback is not callable.
*
* This function is thread-safe.
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
*
* If you want more information available in the callback, like the client
* or other information, you may use a lambda with captures or std::bind.
*
* \sa rmw_client_set_on_new_response_callback
* \sa rcl_client_set_on_new_response_callback
*
* \param[in] callback functor to be called when a new response is received
*/
void
set_on_new_response_callback(std::function<void(size_t)> callback)
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_new_response_callback "
"is not callable.");
}

auto new_callback =
[callback, this](size_t number_of_responses) {
try {
callback(number_of_responses);
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
node_logger_,
"rclcpp::ClientBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on new response' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
node_logger_,
"rclcpp::ClientBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on new response' callback");
}
};

std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

// Set it temporarily to the new callback, while we replace the old one.
// This two-step setting, prevents a gap where the old std::function has
// been replaced but the middleware hasn't been told about the new one yet.
set_on_new_response_callback(
rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
static_cast<const void *>(&new_callback));

// Store the std::function to keep it in scope, also overwrites the existing one.
on_new_response_callback_ = new_callback;

// Set it again, now using the permanent storage.
set_on_new_response_callback(
rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
static_cast<const void *>(&on_new_response_callback_));
}

/// Unset the callback registered for new responses, if any.
void
clear_on_new_response_callback()
{
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
if (on_new_response_callback_) {
set_on_new_response_callback(nullptr, nullptr);
on_new_response_callback_ = nullptr;
}
}

protected:
RCLCPP_DISABLE_COPY(ClientBase)

Expand All @@ -230,13 +317,21 @@ class ClientBase
const rcl_node_t *
get_rcl_node_handle() const;

RCLCPP_PUBLIC
void
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);
wjwwood marked this conversation as resolved.
Show resolved Hide resolved

rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rclcpp::Context> context_;
rclcpp::Logger node_logger_;

std::shared_ptr<rcl_client_t> client_handle_;

std::atomic<bool> in_use_by_wait_set_{false};

std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_response_callback_{nullptr};
};

template<typename ServiceT>
Expand Down
67 changes: 67 additions & 0 deletions rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_
#define RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_

#include <functional>

namespace rclcpp
{

namespace detail
{

/// Trampoline pattern for wrapping std::function into C-style callbacks.
/**
* A common pattern in C is for a function to take a function pointer and a
* void pointer for "user data" which is passed to the function pointer when it
* is called from within C.
*
* It works by using the user data pointer to store a pointer to a
* std::function instance.
* So when called from C, this function will cast the user data to the right
* std::function type and call it.
*
* This should allow you to use free functions, lambdas with and without
* captures, and various kinds of std::bind instances.
*
* The interior of this function is likely to be executed within a C runtime,
* so no exceptions should be thrown at this point, and doing so results in
* undefined behavior.
*
* \tparam UserDataT Deduced type based on what is passed for user data,
* usually this type is either `void *` or `const void *`.
* \tparam Args the arguments being passed to the callback
* \tparam ReturnT the return type of this function and the callback, default void
* \param user_data the function pointer, possibly type erased
* \returns whatever the callback returns, if anything
*/
template<
typename UserDataT,
typename ... Args,
typename ReturnT = void
>
ReturnT
cpp_callback_trampoline(UserDataT user_data, Args ... args) noexcept
{
auto & actual_callback = *reinterpret_cast<const std::function<ReturnT(Args...)> *>(user_data);
return actual_callback(args ...);
}

} // namespace detail

} // namespace rclcpp

#endif // RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_

#include <algorithm>
#include <memory>
#include <mutex>
#include <string>

#include "rcl/wait.h"
#include "rmw/impl/cpp/demangle.hpp"

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/waitable.hpp"

Expand All @@ -35,6 +38,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)

enum class EntityType
{
Subscription,
};

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(
rclcpp::Context::SharedPtr context,
Expand All @@ -43,7 +51,8 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
: gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
{}

virtual ~SubscriptionIntraProcessBase() = default;
RCLCPP_PUBLIC
virtual ~SubscriptionIntraProcessBase();

RCLCPP_PUBLIC
size_t
Expand All @@ -53,7 +62,17 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

virtual bool
bool
is_ready(rcl_wait_set_t * wait_set) override = 0;

std::shared_ptr<void>
take_data() override = 0;

void
execute(std::shared_ptr<void> & data) override = 0;

virtual
bool
use_take_shared_method() const = 0;

RCLCPP_PUBLIC
Expand All @@ -64,13 +83,107 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
QoS
get_actual_qos() const;

/// Set a callback to be called when each new message arrives.
/**
* The callback receives a size_t which is the number of messages received
* since the last time this callback was called.
* Normally this is 1, but can be > 1 if messages were received before any
* callback was set.
*
* The callback also receives an int identifier argument.
* This is needed because a Waitable may be composed of several distinct entities,
* such as subscriptions, services, etc.
* The application should provide a generic callback function that will be then
* forwarded by the waitable to all of its entities.
* Before forwarding, a different value for the identifier argument will be
* bound to the function.
* This implies that the provided callback can use the identifier to behave
* differently depending on which entity triggered the waitable to become ready.
*
* Calling it again will clear any previously set callback.
*
* An exception will be thrown if the callback is not callable.
*
* This function is thread-safe.
*
* If you want more information available in the callback, like the subscription
* or other information, you may use a lambda with captures or std::bind.
*
* \param[in] callback functor to be called when a new message is received.
*/
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_ready_callback "
"is not callable.");
}

// Note: we bind the int identifier argument to this waitable's entity types
auto new_callback =
[callback, this](size_t number_of_events) {
try {
callback(number_of_events, static_cast<int>(EntityType::Subscription));
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
// TODO(wjwwood): get this class access to the node logger it is associated with
rclcpp::get_logger("rclcpp"),
"rclcpp::SubscriptionIntraProcessBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on ready' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"rclcpp::SubscriptionIntraProcessBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on ready' callback");
}
};

std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
on_new_message_callback_ = new_callback;

if (unread_count_ > 0) {
if (qos_profile_.history() == HistoryPolicy::KeepAll) {
on_new_message_callback_(unread_count_);
} else {
// Use qos profile depth as upper bound for unread_count_
on_new_message_callback_(std::min(unread_count_, qos_profile_.depth()));
}
unread_count_ = 0;
}
}

/// Unset the callback registered for new messages, if any.
void
clear_on_ready_callback() override
{
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
on_new_message_callback_ = nullptr;
}

protected:
std::recursive_mutex reentrant_mutex_;
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_message_callback_ {nullptr};
size_t unread_count_{0};
rclcpp::GuardCondition gc_;

virtual void
trigger_guard_condition() = 0;

void
invoke_on_new_message()
{
std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
if (this->on_new_message_callback_) {
this->on_new_message_callback_(1);
} else {
this->unread_count_++;
}
}

private:
std::string topic_name_;
QoS qos_profile_;
Expand Down
Loading