Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .githooks/readme-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ $ ./examples/coro_semaphore
```

### ring_buffer
The `coro::ring_buffer<element, num_elements>` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available.

The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_signal_notify_waiters()` function is called then any producers or consumers that are suspended and waiting will be awoken by throwing a `coro::stop_signal`. This can be useful to write code that will always suspend if data cannot be produced or consumed for long running daemons but will need to break out of the suspend unpon shutdown.
The `coro::ring_buffer<element, num_elements>` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available.

```C++
${EXAMPLE_CORO_RING_BUFFER_CPP}
Expand Down Expand Up @@ -324,6 +322,7 @@ This project uses git submodules, to properly checkout this project use:
This project depends on the following git sub-modules:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver, this is a git submodule.
* [catch2](https://github.com/catchorg/Catch2) For testing, this is embedded in the `test/` directory.
* [expected](https://github.com/TartanLlama/expected) For results on operations that can fail, this is embedded in the `vendor/` directory.

#### Building
mkdir Release && cd Release
Expand Down Expand Up @@ -386,7 +385,7 @@ The tests will automatically be run by github actions on creating a pull request

File bug reports, feature requests and questions using [GitHub libcoro Issues](https://github.com/jbaldwin/libcoro/issues)

Copyright © 2020-2021 Josh Baldwin
Copyright © 2020-2022 Josh Baldwin

[badge.language]: https://img.shields.io/badge/language-C%2B%2B20-yellow.svg
[badge.license]: https://img.shields.io/badge/license-Apache--2.0-blue
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(CARES_SHARED OFF CACHE INTERNAL "")
set(CARES_INSTALL OFF CACHE INTERNAL "")

add_subdirectory(vendor/c-ares/c-ares)
add_subdirectory(vendor/tartanllama/expected)

set(LIBCORO_SOURCE_FILES
inc/coro/concepts/awaitable.hpp
Expand Down Expand Up @@ -56,7 +57,6 @@ set(LIBCORO_SOURCE_FILES
inc/coro/ring_buffer.hpp
inc/coro/semaphore.hpp src/semaphore.cpp
inc/coro/shared_mutex.hpp
inc/coro/stop_signal.hpp
inc/coro/sync_wait.hpp src/sync_wait.cpp
inc/coro/task_container.hpp
inc/coro/task.hpp
Expand All @@ -68,7 +68,7 @@ add_library(${PROJECT_NAME} STATIC ${LIBCORO_SOURCE_FILES})
set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20)
target_include_directories(${PROJECT_NAME} PUBLIC inc)
target_link_libraries(${PROJECT_NAME} PUBLIC pthread c-ares ssl crypto)
target_link_libraries(${PROJECT_NAME} PUBLIC pthread c-ares expected ssl crypto)

if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "10.2.0")
Expand Down
60 changes: 29 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,22 @@ int main()
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 8}};
coro::semaphore semaphore{2};

auto make_rate_limited_task = [&](uint64_t task_num) -> coro::task<void> {
auto make_rate_limited_task = [&](uint64_t task_num) -> coro::task<void>
{
co_await tp.schedule();

// This will only allow 2 tasks through at any given point in time, all other tasks will
// await the resource to be available before proceeding.
co_await semaphore.acquire();
std::cout << task_num << ", ";
semaphore.release();
auto result = co_await semaphore.acquire();
if (result == coro::semaphore::acquire_result::acquired)
{
std::cout << task_num << ", ";
semaphore.release();
}
else
{
std::cout << task_num << " failed to acquire semaphore [" << coro::semaphore::to_string(result) << "],";
}
co_return;
};

Expand All @@ -493,9 +501,7 @@ $ ./examples/coro_semaphore
```

### ring_buffer
The `coro::ring_buffer<element, num_elements>` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available.

The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_signal_notify_waiters()` function is called then any producers or consumers that are suspended and waiting will be awoken by throwing a `coro::stop_signal`. This can be useful to write code that will always suspend if data cannot be produced or consumed for long running daemons but will need to break out of the suspend unpon shutdown.
The `coro::ring_buffer<element, num_elements>` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available.

```C++
#include <coro/coro.hpp>
Expand Down Expand Up @@ -532,40 +538,31 @@ int main()
auto scoped_lock = co_await m.lock();
std::cerr << "\nproducer is sending stop signal";
}
rb.stop_signal_notify_waiters();
rb.notify_waiters();
co_return;
};

auto make_consumer_task = [&](size_t id) -> coro::task<void>
{
co_await tp.schedule();

bool needs_await{false};

try
while (true)
{
while (true)
auto expected = co_await rb.consume();
auto scoped_lock = co_await m.lock(); // just for synchronizing std::cout/cerr
if (!expected)
{
auto value = co_await rb.consume();
{
auto scoped_lock = co_await m.lock();
std::cout << "(id=" << id << ", v=" << value << "), ";
}

// Mimic doing some work on the consumed value.
co_await tp.yield();
std::cerr << "\nconsumer " << id << " shutting down, stop signal received";
break; // while
}
else
{
auto item = std::move(*expected);
std::cout << "(id=" << id << ", v=" << item << "), ";
}
}
catch (const coro::stop_signal&)
{
// Cannot await in an exception handler.
needs_await = true;
}

if (needs_await)
{
auto scoped_lock = co_await m.lock();
std::cerr << "\nconsumer " << id << " shutting down, stop signal received";
// Mimic doing some work on the consumed value.
co_await tp.yield();
}

co_return;
Expand Down Expand Up @@ -1010,6 +1007,7 @@ This project uses git submodules, to properly checkout this project use:
This project depends on the following git sub-modules:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver, this is a git submodule.
* [catch2](https://github.com/catchorg/Catch2) For testing, this is embedded in the `test/` directory.
* [expected](https://github.com/TartanLlama/expected) For results on operations that can fail, this is embedded in the `vendor/` directory.

#### Building
mkdir Release && cd Release
Expand Down Expand Up @@ -1072,7 +1070,7 @@ The tests will automatically be run by github actions on creating a pull request

File bug reports, feature requests and questions using [GitHub libcoro Issues](https://github.com/jbaldwin/libcoro/issues)

Copyright © 2020-2021 Josh Baldwin
Copyright © 2020-2022 Josh Baldwin

[badge.language]: https://img.shields.io/badge/language-C%2B%2B20-yellow.svg
[badge.license]: https://img.shields.io/badge/license-Apache--2.0-blue
Expand Down
37 changes: 14 additions & 23 deletions examples/coro_ring_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,31 @@ int main()
auto scoped_lock = co_await m.lock();
std::cerr << "\nproducer is sending stop signal";
}
rb.stop_signal_notify_waiters();
rb.notify_waiters();
co_return;
};

auto make_consumer_task = [&](size_t id) -> coro::task<void>
{
co_await tp.schedule();

bool needs_await{false};

try
while (true)
{
while (true)
auto expected = co_await rb.consume();
auto scoped_lock = co_await m.lock(); // just for synchronizing std::cout/cerr
if (!expected)
{
auto value = co_await rb.consume();
{
auto scoped_lock = co_await m.lock();
std::cout << "(id=" << id << ", v=" << value << "), ";
}

// Mimic doing some work on the consumed value.
co_await tp.yield();
std::cerr << "\nconsumer " << id << " shutting down, stop signal received";
break; // while
}
else
{
auto item = std::move(*expected);
std::cout << "(id=" << id << ", v=" << item << "), ";
}
}
catch (const coro::stop_signal&)
{
// Cannot await in an exception handler.
needs_await = true;
}

if (needs_await)
{
auto scoped_lock = co_await m.lock();
std::cerr << "\nconsumer " << id << " shutting down, stop signal received";
// Mimic doing some work on the consumed value.
co_await tp.yield();
}

co_return;
Expand Down
16 changes: 12 additions & 4 deletions examples/coro_semaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ int main()
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 8}};
coro::semaphore semaphore{2};

auto make_rate_limited_task = [&](uint64_t task_num) -> coro::task<void> {
auto make_rate_limited_task = [&](uint64_t task_num) -> coro::task<void>
{
co_await tp.schedule();

// This will only allow 2 tasks through at any given point in time, all other tasks will
// await the resource to be available before proceeding.
co_await semaphore.acquire();
std::cout << task_num << ", ";
semaphore.release();
auto result = co_await semaphore.acquire();
if (result == coro::semaphore::acquire_result::acquired)
{
std::cout << task_num << ", ";
semaphore.release();
}
else
{
std::cout << task_num << " failed to acquire semaphore [" << coro::semaphore::to_string(result) << "],";
}
co_return;
};

Expand Down
1 change: 0 additions & 1 deletion inc/coro/coro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "coro/ring_buffer.hpp"
#include "coro/semaphore.hpp"
#include "coro/shared_mutex.hpp"
#include "coro/stop_signal.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"
#include "coro/task_container.hpp"
Expand Down
40 changes: 23 additions & 17 deletions inc/coro/ring_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "coro/stop_signal.hpp"
#include <expected/expected.hpp>

#include <array>
#include <atomic>
Expand All @@ -20,6 +20,17 @@ template<typename element, size_t num_elements>
class ring_buffer
{
public:
enum class produce_result
{
produced,
ring_buffer_stopped
};

enum class consume_result
{
ring_buffer_stopped
};

/**
* @throws std::runtime_error If `num_elements` == 0.
*/
Expand All @@ -34,14 +45,14 @@ class ring_buffer
~ring_buffer()
{
// Wake up anyone still using the ring buffer.
stop_signal_notify_waiters();
notify_waiters();
}

ring_buffer(const ring_buffer<element, num_elements>&) = delete;
ring_buffer(ring_buffer<element, num_elements>&&) = delete;

auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>& = delete;
auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>& = delete;

struct produce_operation
{
Expand Down Expand Up @@ -70,14 +81,11 @@ class ring_buffer
}

/**
* @throws coro::stop_signal if the operation was stopped.
* @return produce_result
*/
auto await_resume() -> void
auto await_resume() -> produce_result
{
if (m_stopped)
{
throw stop_signal{};
}
return !m_stopped ? produce_result::produced : produce_result::ring_buffer_stopped;
}

private:
Expand Down Expand Up @@ -122,14 +130,13 @@ class ring_buffer
}

/**
* @throws coro::stop_signal if the operation was stopped.
* @return The consumed element.
* @return The consumed element or std::nullopt if the consume has failed.
*/
auto await_resume() -> element
auto await_resume() -> tl::expected<element, consume_result>
{
if (m_stopped)
{
throw stop_signal{};
return tl::make_unexpected(consume_result::ring_buffer_stopped);
}

return std::move(m_e);
Expand Down Expand Up @@ -179,11 +186,10 @@ class ring_buffer
auto empty() const -> bool { return size() == 0; }

/**
* Stops all currently awaiting producers and consumers. Their await_resume() function
* will throw a coro::stop_signal. Further produce()/consume() calls will always throw
* a coro::stop_signal after this is called for this ring buffer.
* Wakes up all currently awaiting producers and consumers. Their await_resume() function
* will return an expected consume result that the ring buffer has stopped.
*/
auto stop_signal_notify_waiters() -> void
auto notify_waiters() -> void
{
// Only wake up waiters once.
if (m_stopped.load(std::memory_order::acquire))
Expand Down
Loading