Skip to content

Commit

Permalink
Remove thread from Timer, remove guard conditions for timers
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie Kay committed Aug 8, 2015
1 parent a4154a2 commit 44c296f
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 109 deletions.
140 changes: 63 additions & 77 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class Executor
// Collect the subscriptions and timers to be waited on
bool has_invalid_weak_nodes = false;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
std::vector<rclcpp::timer::TimerBase::SharedPtr> timers;
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
std::vector<rclcpp::client::ClientBase::SharedPtr> clients;
for (auto & weak_node : weak_nodes_) {
Expand All @@ -271,12 +270,6 @@ class Executor
subs.push_back(subscription);
}
}
for (auto & weak_timer : group->timer_ptrs_) {
auto timer = weak_timer.lock();
if (timer) {
timers.push_back(timer);
}
}
for (auto & service : group->service_ptrs_) {
services.push_back(service);
}
Expand Down Expand Up @@ -349,10 +342,9 @@ class Executor
client_handle_index += 1;
}

// Use the number of guard conditions to allocate memory in the handles
// Add 2 to the number for the ctrl-c guard cond and the executor's
size_t start_of_timer_guard_conds = 2;
size_t number_of_guard_conds = timers.size() + start_of_timer_guard_conds;
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
size_t number_of_guard_conds = 2;
rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds;
guard_condition_handles.guard_conditions =
Expand All @@ -368,21 +360,25 @@ class Executor
// Put the executor's guard condition in
guard_condition_handles.guard_conditions[1] = \
interrupt_guard_condition_->data;
// Then fill the SubscriberHandles with ready subscriptions
size_t guard_cond_handle_index = start_of_timer_guard_conds;
for (auto & timer : timers) {
guard_condition_handles.guard_conditions[guard_cond_handle_index] = \
timer->guard_condition_->data;
guard_cond_handle_index += 1;
}

rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;

if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
auto next_timer_duration = get_earliest_timer();
// If the next timer timeout must preempt the requested timeout
// or if the requested timeout blocks forever, and there exists a valid timer,
// replace the requested timeout with the next timeout.
bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero();
if ((next_timer_duration < timeout ||
timeout < std::chrono::duration<int64_t, T>::zero()) && has_valid_timer)
{
rmw_timeout.sec =
std::chrono::duration_cast<std::chrono::seconds>(next_timer_duration).count();
rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
} else if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
// Convert timeout representation to rmw_time
auto timeout_sec = std::chrono::duration_cast<std::chrono::seconds>(timeout);
rmw_timeout.sec = timeout_sec.count();
rmw_timeout.sec = std::chrono::duration_cast<std::chrono::seconds>(timeout).count();
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
(1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
Expand Down Expand Up @@ -419,13 +415,6 @@ class Executor
subscriber_handles_.push_back(handle);
}
}
// Then the timers, start with start_of_timer_guard_conds
for (size_t i = start_of_timer_guard_conds; i < number_of_guard_conds; ++i) {
void * handle = guard_condition_handles.guard_conditions[i];
if (handle) {
guard_condition_handles_.push_back(handle);
}
}
// Then the services
for (size_t i = 0; i < number_of_services; ++i) {
void * handle = service_handles.services[i];
Expand Down Expand Up @@ -478,30 +467,6 @@ class Executor
return rclcpp::subscription::SubscriptionBase::SharedPtr();
}

rclcpp::timer::TimerBase::SharedPtr
get_timer_by_handle(void * guard_condition_handle)
{
for (auto weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto weak_timer : group->timer_ptrs_) {
auto timer = weak_timer.lock();
if (timer && timer->guard_condition_->data == guard_condition_handle) {
return timer;
}
}
}
}
return rclcpp::timer::TimerBase::SharedPtr();
}

rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle)
{
Expand Down Expand Up @@ -548,7 +513,6 @@ class Executor
return rclcpp::client::ClientBase::SharedPtr();
}


rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr & group)
{
Expand Down Expand Up @@ -598,34 +562,58 @@ class Executor
void
get_next_timer(AnyExecutable::SharedPtr & any_exec)
{
auto it = guard_condition_handles_.begin();
while (it != guard_condition_handles_.end()) {
auto timer = get_timer_by_handle(*it);
if (timer) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_timer(timer);
if (!group) {
// Group was not found, meaning the timer is not valid...
// Remove it from the ready list and continue looking
guard_condition_handles_.erase(it++);
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from_.load()) {
continue;
}
if (!group->can_be_taken_from_.load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
for (auto & timer_ref : group->timer_ptrs_) {
auto timer = timer_ref.lock();
if (timer && timer->check_and_trigger()) {
any_exec->timer = timer;
any_exec->callback_group = group;
node = get_node_by_group(group);
return;
}
}
}
}
}

std::chrono::nanoseconds
get_earliest_timer()
{
std::chrono::nanoseconds latest = std::chrono::nanoseconds::max();
bool timers_empty = true;
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from_.load()) {
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->timer = timer;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
guard_condition_handles_.erase(it++);
return;
for (auto & timer_ref : group->timer_ptrs_) {
timers_empty = false;
// Check the expected trigger time
auto timer = timer_ref.lock();
if (timer && timer->time_until_trigger() < latest) {
latest = timer->time_until_trigger();
}
}
}
// Else, the timer is no longer valid, remove it and continue
guard_condition_handles_.erase(it++);
}
if (timers_empty) {
return std::chrono::nanoseconds(-1);
}
return latest;
}

rclcpp::callback_group::CallbackGroup::SharedPtr
Expand Down Expand Up @@ -867,8 +855,6 @@ class Executor
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
typedef std::list<void *> SubscriberHandles;
SubscriberHandles subscriber_handles_;
typedef std::list<void *> GuardConditionHandles;
GuardConditionHandles guard_condition_handles_;
typedef std::list<void *> ServiceHandles;
ServiceHandles service_handles_;
typedef std::list<void *> ClientHandles;
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/rate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ class GenericRate : public RateBase
last_interval_ = Clock::now();
}

std::chrono::nanoseconds period() const
{
return period_;
}

private:
RCLCPP_DISABLE_COPY(GenericRate);

Expand Down
72 changes: 40 additions & 32 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,12 @@ class TimerBase
TimerBase(std::chrono::nanoseconds period, CallbackType callback)
: period_(period),
callback_(callback),
guard_condition_(rmw_create_guard_condition()),
canceled_(false)
{
if (!guard_condition_) {
// TODO(wjwwood): use custom exception
throw std::runtime_error(
std::string("failed to create guard condition: ") +
rmw_get_error_string_safe()
);
}
}

virtual ~TimerBase()
{
if (guard_condition_) {
if (rmw_destroy_guard_condition(guard_condition_) != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in TimerBase destructor, rmw_destroy_guard_condition failed: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
}

void
Expand All @@ -83,12 +67,17 @@ class TimerBase
this->canceled_ = true;
}

virtual std::chrono::nanoseconds
time_until_trigger() = 0;

virtual bool is_steady() = 0;

// Interface for externally triggering the timer event
virtual bool check_and_trigger() = 0;

protected:
std::chrono::nanoseconds period_;
CallbackType callback_;
rmw_guard_condition_t * guard_condition_;

bool canceled_;

Expand All @@ -106,29 +95,48 @@ class GenericTimer : public TimerBase
GenericTimer(std::chrono::nanoseconds period, CallbackType callback)
: TimerBase(period, callback), loop_rate_(period)
{
thread_ = std::thread(&GenericTimer<Clock>::run, this);
/* Subtracting the loop rate period ensures that the callback gets triggered
on the first call to check_and_trigger. */
last_triggered_time_ = Clock::now() - period;
}

virtual ~GenericTimer()
{
cancel();
thread_.join();
}

void
run()
// return: true to trigger callback on the next "execute_timer" call in executor
bool
check_and_trigger()
{
if (canceled_) {
return false;
}
if (Clock::now() < last_triggered_time_) {
return false;
}
if (std::chrono::duration_cast<std::chrono::nanoseconds>(Clock::now() - last_triggered_time_) >=
loop_rate_.period())
{
last_triggered_time_ = Clock::now();
return true;
}
return false;
}

std::chrono::nanoseconds
time_until_trigger()
{
while (rclcpp::utilities::ok() && !this->canceled_) {
loop_rate_.sleep();
if (!rclcpp::utilities::ok()) {
return;
}
rmw_ret_t status = rmw_trigger_guard_condition(guard_condition_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe());
}
std::chrono::nanoseconds time_until_trigger;
// Calculate the time between the next trigger and the current time
if (last_triggered_time_ + loop_rate_.period() < Clock::now()) {
// time is overdue, need to trigger immediately
time_until_trigger = std::chrono::nanoseconds::zero();
} else {
time_until_trigger = std::chrono::duration_cast<std::chrono::nanoseconds>(
last_triggered_time_ - Clock::now()) + loop_rate_.period();
}
return time_until_trigger;
}

virtual bool
Expand All @@ -140,8 +148,8 @@ class GenericTimer : public TimerBase
private:
RCLCPP_DISABLE_COPY(GenericTimer);

std::thread thread_;
rclcpp::rate::GenericRate<Clock> loop_rate_;
std::chrono::time_point<Clock> last_triggered_time_;

};

Expand Down

0 comments on commit 44c296f

Please sign in to comment.