Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ring buffer thread-safe #1213

Merged
merged 6 commits into from
Jul 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ namespace experimental
namespace buffers
{

/// Store elements in a fixed-size, FIFO buffer
/**
* All public member functions are thread-safe.
*/
template<typename BufferT>
class RingBufferImplementation : public BufferImplementationBase<BufferT>
{
Expand All @@ -55,55 +59,125 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>

virtual ~RingBufferImplementation() {}

/// Add a new element to store in the ring buffer
/**
* This member function is thread-safe.
*
* \param request the element to be stored in the ring buffer
*/
void enqueue(BufferT request)
{
std::lock_guard<std::mutex> lock(mutex_);

write_index_ = next(write_index_);
write_index_ = next_(write_index_);
ring_buffer_[write_index_] = std::move(request);

if (is_full()) {
read_index_ = next(read_index_);
if (is_full_()) {
read_index_ = next_(read_index_);
} else {
size_++;
}
}

/// Remove the oldest element from ring buffer
/**
* This member function is thread-safe.
*
* \return the element that is being removed from the ring buffer
*/
BufferT dequeue()
{
std::lock_guard<std::mutex> lock(mutex_);

if (!has_data()) {
if (!has_data_()) {
RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Calling dequeue on empty intra-process buffer");
throw std::runtime_error("Calling dequeue on empty intra-process buffer");
}

auto request = std::move(ring_buffer_[read_index_]);
read_index_ = next(read_index_);
read_index_ = next_(read_index_);

size_--;

return request;
}

/// Get the next index value for the ring buffer
/**
* This member function is thread-safe.
*
* \param val the current index value
* \return the next index value
*/
inline size_t next(size_t val)
{
return (val + 1) % capacity_;
std::lock_guard<std::mutex> lock(mutex_);
return next_(val);
}

/// Get if the ring buffer has at least one element stored
/**
* This member function is thread-safe.
*
* \return `true` if there is data and `false` otherwise
*/
inline bool has_data() const
{
return size_ != 0;
std::lock_guard<std::mutex> lock(mutex_);
return has_data_();
}

inline bool is_full()
/// Get if the size of the buffer is equal to its capacity
/**
* This member function is thread-safe.
*
* \return `true` if the size of the buffer is equal is capacity
* and `false` otherwise
*/
inline bool is_full() const
{
return size_ == capacity_;
std::lock_guard<std::mutex> lock(mutex_);
return is_full_();
}

void clear() {}

private:
/// Get the next index value for the ring buffer
/**
* This member function is not thread-safe.
*
* \param val the current index value
* \return the next index value
*/
inline size_t next_(size_t val)
{
return (val + 1) % capacity_;
}

/// Get if the ring buffer has at least one element stored
/**
* This member function is not thread-safe.
*
* \return `true` if there is data and `false` otherwise
*/
inline bool has_data_() const
{
return size_ != 0;
}

/// Get if the size of the buffer is equal to its capacity
/**
* This member function is not thread-safe.
*
* \return `true` if the size of the buffer is equal is capacity
* and `false` otherwise
*/
inline bool is_full_() const
{
return size_ == capacity_;
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand All @@ -112,7 +186,7 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
size_t read_index_;
size_t size_;

std::mutex mutex_;
mutable std::mutex mutex_;
};

} // namespace buffers
Expand Down