Skip to content

Commit

Permalink
Merge pull request #49 from irobot-ros/asoragna/cleanup
Browse files Browse the repository at this point in the history
Asoragna/cleanup
  • Loading branch information
iRobot ROS committed Feb 23, 2021
2 parents 3a50ee8 + b72d124 commit 00666c1
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 179 deletions.
19 changes: 9 additions & 10 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ namespace executors

/// Events executor implementation
/**
* This executor is a variation of the standard one that does not uses a waitset.
* The executor uses an events queue and a timers manager to execute entities from its
* This executor uses an events queue and a timers manager to execute entities from its
* associated nodes and callback groups.
* This provides improved performance as it allows to skip all the waitset maintenance operations.
* The RMW listener APIs are used to collect new events.
*
* This executor tries to reduce as much as possible the amount of maintenance operations.
* This allows to use customized `EventsQueue` classes to achieve different goals such
* as very low CPU usage, bounded memory requirement, determinism, etc.
*
* The executor uses a weak ownership model and it locks entities only while executing
* their related events.
*
* To run this executor:
* rclcpp::executors::EventsExecutor executor;
Expand Down Expand Up @@ -204,8 +210,6 @@ class EventsExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(EventsExecutor)

using EventQueue = std::queue<rmw_listener_event_t>;

// Executor callback: Push new events into the queue and trigger cv.
// This function is called by the DDS entities when an event happened,
// like a subscription receiving a message.
Expand All @@ -229,11 +233,6 @@ class EventsExecutor : public rclcpp::Executor
this_executor->events_queue_cv_.notify_one();
}

/// Extract and execute events from the queue until it is empty
RCLCPP_PUBLIC
void
consume_all_events(EventQueue & queue);

// Execute a single event
RCLCPP_PUBLIC
void
Expand Down
175 changes: 102 additions & 73 deletions rclcpp/include/rclcpp/executors/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ namespace executors
*
* Timers management
* This class provides APIs to add and remove timers.
* It keeps a list of weak pointers from added timers, and owns them only when
* have expired and need to be executed.
* It keeps a list of weak pointers from added timers, and locks them only when
* they need to be executed or modified.
* Timers are kept ordered in a binary-heap priority queue.
* Calls to add/remove APIs will temporarily block the execution of the timers and
* will require to reorder the internal priority queue of timers.
* will require to reorder the internal priority queue.
* Because of this, they have a not-negligible impact on the performance.
*
* Timers execution
* The most efficient implementation consists in letting a TimersManager object
* to spawn a thread where timers are monitored and periodically executed.
* Besides this, other APIs allow to either execute a single timer or all the
* currently ready ones.
* This class assumes that the execute_callback API of the stored timer is never
* called by other entities, but can only be called from here.
* If this assumption is not respected, the heap property will be invalidated,
* so timers may be executed out of order.
* This class assumes that the `execute_callback()` API of the stored timers is never
* called by other entities, but it can only be called from here.
* If this assumption is not respected, the heap property may be invalidated,
* so timers may be executed out of order, without this object noticing it.
*
*/
class TimersManager
Expand All @@ -65,70 +65,80 @@ class TimersManager

/**
* @brief Construct a new TimersManager object
*
* @param context custom context to be used.
* Shared ownership of the context is held until destruction.
*/
explicit TimersManager(std::shared_ptr<rclcpp::Context> context);

/**
* @brief Destruct the object making sure to stop thread and release memory.
* @brief Destruct the TimersManager object making sure to stop thread and release memory.
*/
~TimersManager();

/**
* @brief Adds a new TimerBase::WeakPtr to the storage.
* This object will store a weak pointer of the added timer
* in a heap data structure.
* @param timer the timer to be added
* @brief Adds a new timer to the storage, maintaining weak ownership of it.
* Function is thread safe and it can be called regardless of the state of the timers thread.
*
* @param timer the timer to add.
*/
void add_timer(rclcpp::TimerBase::SharedPtr timer);

/**
* @brief Starts a thread that takes care of executing timers added to this object.
* @brief Remove a single timer from the object storage.
* Will do nothing if the timer was not being stored here.
* Function is thread safe and it can be called regardless of the state of the timers thread.
*
* @param timer the timer to remove.
*/
void remove_timer(rclcpp::TimerBase::SharedPtr timer);

/**
* @brief Remove all the timers stored in the object.
* Function is thread safe and it can be called regardless of the state of the timers thread.
*/
void clear();

/**
* @brief Starts a thread that takes care of executing the timers stored in this object.
* Function will throw an error if the timers thread was already running.
*/
void start();

/**
* @brief Stops the timers thread.
* Will do nothing if the timer thread was not running.
*/
void stop();

/**
* @brief Executes all the timers currently ready when the function is invoked
* while keeping the heap correctly sorted.
* @return std::chrono::nanoseconds for next timer to expire,
* the returned value could be negative if the timer is already expired
* or MAX_TIME if the heap is empty.
* @brief Executes all the timers currently ready when the function was invoked.
* This function will lock all the stored timers throughout its duration.
* Function is thread safe, but it will throw an error if the timers thread is running.
*/
std::chrono::nanoseconds execute_ready_timers();
void execute_ready_timers();

/**
* @brief Executes head timer if ready at time point.
* @param tp the timepoint to check for
* @return true if head timer was ready at tp
* Function is thread safe, but it will throw an error if the timers thread is running.
*
* @param tp the time point to check for, where `max()` denotes that no check will be performed.
* @return true if head timer was ready at time point.
*/
bool execute_head_timer(
std::chrono::time_point<std::chrono::steady_clock> tp =
std::chrono::time_point<std::chrono::steady_clock>::max());

/**
* @brief Get the amount of time before the next timer expires.
* Function is thread safe, but it will throw an error if the timers thread is running.
*
* @return std::chrono::nanoseconds to wait,
* the returned value could be negative if the timer is already expired
* or MAX_TIME if the heap is empty.
* or MAX_TIME if there are no timers stored in the object.
*/
std::chrono::nanoseconds get_head_timeout();

/**
* @brief Remove all the timers stored in the object.
*/
void clear();

/**
* @brief Remove a single timer stored in the object, passed as a shared_ptr.
* @param timer the timer to remove.
*/
void remove_timer(rclcpp::TimerBase::SharedPtr timer);

// This is what the TimersManager uses to denote a duration forever.
// We don't use std::chrono::nanoseconds::max because it will overflow.
// See https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for
Expand All @@ -144,20 +154,22 @@ class TimersManager
class TimersHeap;

/**
* @brief This class allows to store weak pointers to timers in a heap data structure.
* @brief This class allows to store weak pointers to timers in a heap-like data structure.
* The root of the heap is the timer that expires first.
* Since this class uses weak ownership, it is not guaranteed that it represents a valid heap
* at any point in time as timers could go out of scope, thus invalidating it.
* The "validate_and_lock" API allows to get ownership of the timers and also makes sure that
* the heap property is respected.
* The root of the heap is the timer that expires first.
* This class is not thread safe and requires external mutexes to protect its usage.
*/
class WeakTimersHeap
{
public:
/**
* @brief Try to add a new timer to the heap.
* After the addition, the heap property is preserved.
* @param timer new timer to add
* @return true if timer has been added, false if it was already there
* @brief Add a new timer to the heap. After the addition, the heap property is enforced.
*
* @param timer new timer to add.
* @return true if timer has been added, false if it was already there.
*/
bool add_timer(TimerPtr timer)
{
Expand All @@ -173,10 +185,10 @@ class TimersManager
}

/**
* @brief Try to remove a timer from the heap.
* After the removal, the heap property is preserved.
* @param timer timer to remove
* @return true if timer has been removed, false if it was not there
* @brief Remove a timer from the heap. After the removal, the heap property is enforced.
*
* @param timer timer to remove.
* @return true if timer has been removed, false if it was not there.
*/
bool remove_timer(TimerPtr timer)
{
Expand All @@ -191,9 +203,27 @@ class TimersManager
return removed;
}

/**
* @brief Returns a const reference to the front element
*/
const WeakTimerPtr & front() const
{
return weak_heap_.front();
}

/**
* @brief Returns whether the heap is empty or not
*/
bool empty() const
{
return weak_heap_.empty();
}

/**
* @brief This function restores the current object as a valid heap
* and it also returns a locked version of it
* and it returns a locked version of it.
* It is the only public API to access and manipulate the stored timers.
*
* @return TimersHeap owned timers corresponding to the current object
*/
TimersHeap validate_and_lock()
Expand All @@ -205,8 +235,9 @@ class TimersManager

while (it != weak_heap_.end()) {
if (auto timer_shared_ptr = it->lock()) {
// This timer is valid, add it to the vector
locked_heap.push_back(std::move(timer_shared_ptr));
// This timer is valid, add it to the locked heap
// Note: we access private `owned_heap_` member field.
locked_heap.owned_heap_.push_back(std::move(timer_shared_ptr));
it++;
} else {
// This timer went out of scope, remove it
Expand All @@ -229,11 +260,15 @@ class TimersManager
/**
* @brief This function allows to recreate the heap of weak pointers
* from an heap of owned pointers.
* It is required to be called after a locked TimersHeap generated from this object
* has been modified in any way (e.g. timers triggered, added, removed).
*
* @param heap timers heap to store as weak pointers
*/
void store(const TimersHeap & heap)
{
weak_heap_.clear();
// Note: we access private `owned_heap_` member field.
for (auto t : heap.owned_heap_) {
weak_heap_.push_back(t);
}
Expand All @@ -254,7 +289,8 @@ class TimersManager
/**
* @brief This class is the equivalent of WeakTimersHeap but with ownership of the timers.
* It can be generated by locking the weak version.
* It provides operations to manipulate the heap
* It provides operations to manipulate the heap.
* This class is not thread safe and requires external mutexes to protect its usage.
*/
class TimersHeap
{
Expand Down Expand Up @@ -324,13 +360,15 @@ class TimersManager
}

/**
* @brief Restore a valid heap after the root value has been replaced.
* @brief Restore a valid heap after the root value has been replaced (e.g. timer triggered).
*/
void heapify_root()
{
// The following code is a more efficient version of doing
// - pop_heap; pop_back;
// - push_back; push_heap;
// pop_heap();
// pop_back();
// push_back();
// push_heap();
// as it removes the need for the last push_heap

// Push the modified element (i.e. the current root) at the bottom of the heap
Expand All @@ -350,26 +388,21 @@ class TimersManager
}

/**
* @brief Convenience function that allows to push an element at the bottom of the heap.
* It will not perform any check on whether the heap remains valid or not.
* Those checks are responsibility of the calling code.
*
* @param timer timer to push at the back of the data structure representing the heap
* @brief Friend declaration to allow the `validate_and_lock()` function to access the
* underlying heap container
*/
void push_back(TimerPtr timer)
{
owned_heap_.push_back(timer);
}
friend TimersHeap WeakTimersHeap::validate_and_lock();

/**
* @brief Friend declaration to allow the store function to access the underlying
* heap container
* @brief Friend declaration to allow the `store()` function to access the
* underlying heap container
*/
friend void WeakTimersHeap::store(const TimersHeap & heap);

private:
/**
* @brief Comparison function between timers.
* Returns true if `a` expires after `b`.
*/
static bool timer_greater(TimerPtr a, TimerPtr b)
{
Expand All @@ -392,27 +425,23 @@ class TimersManager
* @return std::chrono::nanoseconds to wait,
* the returned value could be negative if the timer is already expired
* or MAX_TIME if the heap is empty.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
std::chrono::nanoseconds get_head_timeout_unsafe(const TimersHeap & heap)
{
if (heap.empty()) {
return MAX_TIME;
}
return (heap.front())->time_until_trigger();
}
std::chrono::nanoseconds get_head_timeout_unsafe();

/**
* @brief Executes all the timers currently ready when the function is invoked
* while keeping the heap correctly sorted.
* This function is not thread safe, acquire a mutex before calling it.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
void execute_ready_timers_unsafe(TimersHeap & heap);
void execute_ready_timers_unsafe();

/**
* @brief Helper function that checks whether a timer was already ready
* at a specific timepoint
* at a specific time point.
* @param timer a pointer to the timer to check for
* @param tp the timepoint to check for
* @param tp the time point to check for
* @return true if timer was ready at tp
*/
bool timer_was_ready_at_tp(
Expand All @@ -424,7 +453,7 @@ class TimersManager
return time_ready < tp;
}

// Thread used to run the timers monitoring and execution task
// Thread used to run the timers execution task
std::thread timers_thread_;
// Protects access to timers
std::mutex timers_mutex_;
Expand Down
Loading

0 comments on commit 00666c1

Please sign in to comment.