Skip to content

Commit

Permalink
Allow spin_until_future_complete to accept std::future
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Mittal <sarthakmittal2608@gmail.com>
  • Loading branch information
naiveHobo committed May 11, 2020
1 parent e24f402 commit 790b1c4
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
49 changes: 49 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,55 @@ class Executor
return FutureReturnCode::INTERRUPTED;
}

template<typename ResponseT, typename TimeRepT = int64_t, typename TimeT = std::milli>
FutureReturnCode
spin_until_future_complete(
const std::future<ResponseT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

while (rclcpp::ok(this->context_)) {
// Do one item of work.
spin_once(timeout_left);
// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

/// Cancel any running spin* function, causing it to return.
/* This function can be called asynchonously from any thread. */
RCLCPP_PUBLIC
Expand Down
54 changes: 54 additions & 0 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ spin_node_until_future_complete(
return retcode;
}

template<typename ResponseT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const std::future<ResponseT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename ResponseT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
Expand All @@ -98,6 +114,22 @@ spin_node_until_future_complete(
timeout);
}

template<typename NodeT = rclcpp::Node, typename ResponseT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const std::future<ResponseT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::executors::spin_node_until_future_complete(
executor,
node_ptr->get_node_base_interface(),
future,
timeout);
}

} // namespace executors

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
Expand All @@ -111,6 +143,17 @@ spin_until_future_complete(
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const std::future<FutureT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
Expand All @@ -122,6 +165,17 @@ spin_until_future_complete(
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
std::shared_ptr<NodeT> node_ptr,
const std::future<FutureT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
}

} // namespace rclcpp

#endif // RCLCPP__EXECUTORS_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,54 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
return rclcpp::FutureReturnCode::INTERRUPTED;
}

template<typename ResponseT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::FutureReturnCode
spin_until_future_complete(
std::future<ResponseT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);

while (rclcpp::ok(this->context_)) {
// Do one set of work.
entities_collector_->refresh_wait_set(timeout_left);
execute_ready_executables();
// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return rclcpp::FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return rclcpp::FutureReturnCode::INTERRUPTED;
}

protected:
/// Check which executables in ExecutableList struct are ready from wait_set and execute them.
/**
Expand Down

0 comments on commit 790b1c4

Please sign in to comment.