From 91fcb2b63e0934714941e1381e5b1824961f7adf Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Tue, 27 Jul 2021 23:05:36 -0700 Subject: [PATCH] wait for message (#1705) * wait for message Signed-off-by: Karsten Knese * move to own header file Signed-off-by: Karsten Knese * linters Signed-off-by: Karsten Knese * add gc for shutdown interrupt Signed-off-by: Karsten Knese * mention behavior when shutdown is called Signed-off-by: Karsten Knese * check gc Signed-off-by: Karsten Knese --- rclcpp/include/rclcpp/wait_for_message.hpp | 100 +++++++++++++++++++ rclcpp/test/rclcpp/CMakeLists.txt | 7 ++ rclcpp/test/rclcpp/test_wait_for_message.cpp | 74 ++++++++++++++ 3 files changed, 181 insertions(+) create mode 100644 rclcpp/include/rclcpp/wait_for_message.hpp create mode 100644 rclcpp/test/rclcpp/test_wait_for_message.cpp diff --git a/rclcpp/include/rclcpp/wait_for_message.hpp b/rclcpp/include/rclcpp/wait_for_message.hpp new file mode 100644 index 0000000000..f5f3adc9a4 --- /dev/null +++ b/rclcpp/include/rclcpp/wait_for_message.hpp @@ -0,0 +1,100 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__WAIT_FOR_MESSAGE_HPP_ +#define RCLCPP__WAIT_FOR_MESSAGE_HPP_ + +#include +#include + +#include "rclcpp/node.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/wait_set.hpp" + +namespace rclcpp +{ +/// Wait for the next incoming message. +/** + * Given an already initialized subscription, + * wait for the next incoming message to arrive before the specified timeout. + * + * \param[out] out is the message to be filled when a new message is arriving. + * \param[in] subscription shared pointer to a previously initialized subscription. + * \param[in] context shared pointer to a context to watch for SIGINT requests. + * \param[in] time_to_wait parameter specifying the timeout before returning. + * \return true if a message was successfully received, false if message could not + * be obtained or shutdown was triggered asynchronously on the context. + */ +template +bool wait_for_message( + MsgT & out, + std::shared_ptr> subscription, + std::shared_ptr context, + std::chrono::duration time_to_wait = std::chrono::duration(-1)) +{ + auto gc = std::make_shared(context); + auto shutdown_callback_handle = context->add_on_shutdown_callback( + [weak_gc = std::weak_ptr{gc}]() { + auto strong_gc = weak_gc.lock(); + if (strong_gc) { + strong_gc->trigger(); + } + }); + + rclcpp::WaitSet wait_set; + wait_set.add_subscription(subscription); + wait_set.add_guard_condition(gc); + auto ret = wait_set.wait(time_to_wait); + if (ret.kind() != rclcpp::WaitResultKind::Ready) { + return false; + } + + if (wait_set.get_rcl_wait_set().guard_conditions[0]) { + return false; + } + + rclcpp::MessageInfo info; + if (!subscription->take(out, info)) { + return false; + } + + return true; +} + +/// Wait for the next incoming message. +/** + * Wait for the next incoming message to arrive on a specified topic before the specified timeout. + * + * \param[out] out is the message to be filled when a new message is arriving. + * \param[in] node the node pointer to initialize the subscription on. + * \param[in] topic the topic to wait for messages. + * \param[in] time_to_wait parameter specifying the timeout before returning. + * \return true if a message was successfully received, false if message could not + * be obtained or shutdown was triggered asynchronously on the context. + */ +template +bool wait_for_message( + MsgT & out, + rclcpp::Node::SharedPtr node, + const std::string & topic, + std::chrono::duration time_to_wait = std::chrono::duration(-1)) +{ + auto sub = node->create_subscription(topic, 1, [](const std::shared_ptr) {}); + return wait_for_message( + out, sub, node->get_node_options().context(), time_to_wait); +} + +} // namespace rclcpp + +#endif // RCLCPP__WAIT_FOR_MESSAGE_HPP_ diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index f5206c1d05..f1a5096203 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -565,6 +565,13 @@ if(TARGET test_utilities) target_link_libraries(test_utilities ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_wait_for_message test_wait_for_message.cpp) +if(TARGET test_wait_for_message) + ament_target_dependencies(test_wait_for_message + "test_msgs") + target_link_libraries(test_wait_for_message ${PROJECT_NAME}) +endif() + ament_add_gtest(test_interface_traits test_interface_traits.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_interface_traits) diff --git a/rclcpp/test/rclcpp/test_wait_for_message.cpp b/rclcpp/test/rclcpp/test_wait_for_message.cpp new file mode 100644 index 0000000000..f0071698ca --- /dev/null +++ b/rclcpp/test/rclcpp/test_wait_for_message.cpp @@ -0,0 +1,74 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/node.hpp" +#include "rclcpp/wait_for_message.hpp" + +#include "test_msgs/msg/strings.hpp" +#include "test_msgs/message_fixtures.hpp" + +using namespace std::chrono_literals; + +TEST(TestUtilities, wait_for_message) { + rclcpp::init(0, nullptr); + + auto node = std::make_shared("wait_for_message_node"); + + using MsgT = test_msgs::msg::Strings; + auto pub = node->create_publisher("wait_for_message_topic", 10); + + MsgT out; + auto received = false; + auto wait = std::async( + [&]() { + auto ret = rclcpp::wait_for_message(out, node, "wait_for_message_topic", 5s); + EXPECT_TRUE(ret); + received = true; + }); + + for (auto i = 0u; i < 10 && received == false; ++i) { + pub->publish(*get_messages_strings()[0]); + std::this_thread::sleep_for(1s); + } + ASSERT_TRUE(received); + EXPECT_EQ(out, *get_messages_strings()[0]); + + rclcpp::shutdown(); +} + +TEST(TestUtilities, wait_for_message_indefinitely) { + rclcpp::init(0, nullptr); + + auto node = std::make_shared("wait_for_message_node2"); + + using MsgT = test_msgs::msg::Strings; + MsgT out; + auto received = false; + auto wait = std::async( + [&]() { + auto ret = rclcpp::wait_for_message(out, node, "wait_for_message_topic" /*, -1 */); + EXPECT_TRUE(ret); + received = true; + }); + + rclcpp::shutdown(); + + ASSERT_FALSE(received); +}