Skip to content

Commit

Permalink
Make ring buffer thread-safe (#1213)
Browse files Browse the repository at this point in the history
* Add recursive mutexs to ring buffer to avoid race conditions

Signed-off-by: Audrow <audrow.nash@gmail.com>

* Replace recursive_mutex with shared_timed_mutex

Signed-off-by: Audrow <audrow.nash@gmail.com>

* Replace shared_timed_mutex with regular mutex

Signed-off-by: Audrow <audrow.nash@gmail.com>

* Document the ring buffer

Signed-off-by: Audrow <audrow.nash@gmail.com>

* Remove trailing whitespace

Signed-off-by: Audrow <audrow.nash@gmail.com>

* Fix typo in has_data() document string

Signed-off-by: Audrow <audrow.nash@gmail.com>
  • Loading branch information
audrow committed Jul 14, 2020
1 parent a640c3e commit 7517276
Showing 1 changed file with 84 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ namespace experimental
namespace buffers
{

/// Store elements in a fixed-size, FIFO buffer
/**
* All public member functions are thread-safe.
*/
template<typename BufferT>
class RingBufferImplementation : public BufferImplementationBase<BufferT>
{
Expand All @@ -55,55 +59,125 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>

virtual ~RingBufferImplementation() {}

/// Add a new element to store in the ring buffer
/**
* This member function is thread-safe.
*
* \param request the element to be stored in the ring buffer
*/
void enqueue(BufferT request)
{
std::lock_guard<std::mutex> lock(mutex_);

write_index_ = next(write_index_);
write_index_ = next_(write_index_);
ring_buffer_[write_index_] = std::move(request);

if (is_full()) {
read_index_ = next(read_index_);
if (is_full_()) {
read_index_ = next_(read_index_);
} else {
size_++;
}
}

/// Remove the oldest element from ring buffer
/**
* This member function is thread-safe.
*
* \return the element that is being removed from the ring buffer
*/
BufferT dequeue()
{
std::lock_guard<std::mutex> lock(mutex_);

if (!has_data()) {
if (!has_data_()) {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Calling dequeue on empty intra-process buffer");
throw std::runtime_error("Calling dequeue on empty intra-process buffer");
}

auto request = std::move(ring_buffer_[read_index_]);
read_index_ = next(read_index_);
read_index_ = next_(read_index_);

size_--;

return request;
}

/// Get the next index value for the ring buffer
/**
* This member function is thread-safe.
*
* \param val the current index value
* \return the next index value
*/
inline size_t next(size_t val)
{
return (val + 1) % capacity_;
std::lock_guard<std::mutex> lock(mutex_);
return next_(val);
}

/// Get if the ring buffer has at least one element stored
/**
* This member function is thread-safe.
*
* \return `true` if there is data and `false` otherwise
*/
inline bool has_data() const
{
return size_ != 0;
std::lock_guard<std::mutex> lock(mutex_);
return has_data_();
}

inline bool is_full()
/// Get if the size of the buffer is equal to its capacity
/**
* This member function is thread-safe.
*
* \return `true` if the size of the buffer is equal is capacity
* and `false` otherwise
*/
inline bool is_full() const
{
return size_ == capacity_;
std::lock_guard<std::mutex> lock(mutex_);
return is_full_();
}

void clear() {}

private:
/// Get the next index value for the ring buffer
/**
* This member function is not thread-safe.
*
* \param val the current index value
* \return the next index value
*/
inline size_t next_(size_t val)
{
return (val + 1) % capacity_;
}

/// Get if the ring buffer has at least one element stored
/**
* This member function is not thread-safe.
*
* \return `true` if there is data and `false` otherwise
*/
inline bool has_data_() const
{
return size_ != 0;
}

/// Get if the size of the buffer is equal to its capacity
/**
* This member function is not thread-safe.
*
* \return `true` if the size of the buffer is equal is capacity
* and `false` otherwise
*/
inline bool is_full_() const
{
return size_ == capacity_;
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand All @@ -112,7 +186,7 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
size_t read_index_;
size_t size_;

std::mutex mutex_;
mutable std::mutex mutex_;
};

} // namespace buffers
Expand Down

0 comments on commit 7517276

Please sign in to comment.