Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions include/coro/ring_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ namespace ring_buffer_result
enum class produce
{
produced,
notified,
stopped
};

enum class consume
{
notified,
stopped
};
} // namespace ring_buffer_result
Expand Down Expand Up @@ -81,6 +83,7 @@ class ring_buffer
// Produce operations can only proceed if running.
if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
{
m_result = ring_buffer_result::produce::stopped;
mutex.unlock();
return true; // Will be awoken with produce::stopped
}
Expand Down Expand Up @@ -111,13 +114,13 @@ class ring_buffer
*/
auto await_resume() -> ring_buffer_result::produce
{
return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
? ring_buffer_result::produce::produced
: ring_buffer_result::produce::stopped;
return m_result;
}

/// If the operation needs to suspend, the coroutine to resume when the element can be produced.
std::coroutine_handle<> m_awaiting_coroutine;
/// The result that should be returned when this coroutine resumes.
ring_buffer_result::produce m_result{ring_buffer_result::produce::produced};
/// Linked list of produce operations that are awaiting to produce their element.
produce_operation* m_next{nullptr};

Expand All @@ -144,6 +147,7 @@ class ring_buffer
// Consume operations proceed until stopped.
if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
{
m_result = ring_buffer_result::consume::stopped;
mutex.unlock();
return true;
}
Expand Down Expand Up @@ -180,12 +184,14 @@ class ring_buffer
}
else // state is stopped
{
return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
return unexpected<ring_buffer_result::consume>(m_result);
}
}

/// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
std::coroutine_handle<> m_awaiting_coroutine;
/// The unexpected result this should return on resume
ring_buffer_result::consume m_result{ring_buffer_result::consume::stopped};
/// Linked list of consume operations that are awaiting to consume an element.
consume_operation* m_next{nullptr};

Expand Down Expand Up @@ -224,6 +230,14 @@ class ring_buffer
co_return result;
}

/**
* @return The maximum number of elements the ring buffer can hold.
*/
constexpr auto max_size() const noexcept -> size_t
{
return num_elements;
}

/**
* @return The current number of elements contained in the ring buffer.
*/
Expand All @@ -237,6 +251,65 @@ class ring_buffer
*/
auto empty() const -> bool { return size() == 0; }

/**
* @return True if the ring buffer has no more space.
*/
auto full() const -> bool { return size() == max_size(); }

/**
* @brief Wakes up all currently awaiting producers. Their await_resume() function
* will return an expected produce result that producers have been notified.
*/
auto notify_producers() -> coro::task<void>
{
auto expected = m_running_state.load(std::memory_order::acquire);
if (expected == running_state_t::stopped)
{
co_return;
}

co_await m_mutex.lock();
auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
m_mutex.unlock();

while (produce_waiters != nullptr)
{
auto* next = produce_waiters->m_next;
produce_waiters->m_result = ring_buffer_result::produce::notified;
produce_waiters->m_awaiting_coroutine.resume();
produce_waiters = next;
}

co_return;
}

/**
* @brief Wakes up all currently awaiting consumers. Their await_resume() function
* will return an expected consume result that consumers have been notified.
*/
auto notify_consumers() -> coro::task<void>
{
auto expected = m_running_state.load(std::memory_order::acquire);
if (expected == running_state_t::stopped)
{
co_return;
}

co_await m_mutex.lock();
auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
m_mutex.unlock();

while (consume_waiters != nullptr)
{
auto* next = consume_waiters->m_next;
consume_waiters->m_result = ring_buffer_result::consume::notified;
consume_waiters->m_awaiting_coroutine.resume();
consume_waiters = next;
}

co_return;
}

/**
* @brief Wakes up all currently awaiting producers and consumers. Their await_resume() function
* will return an expected consume result that the ring buffer has stopped.
Expand Down Expand Up @@ -266,13 +339,15 @@ class ring_buffer
while (produce_waiters != nullptr)
{
auto* next = produce_waiters->m_next;
produce_waiters->m_result = ring_buffer_result::produce::stopped;
produce_waiters->m_awaiting_coroutine.resume();
produce_waiters = next;
}

while (consume_waiters != nullptr)
{
auto* next = consume_waiters->m_next;
consume_waiters->m_result = ring_buffer_result::consume::stopped;
consume_waiters->m_awaiting_coroutine.resume();
consume_waiters = next;
}
Expand Down
Loading