Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .githooks/readme-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22
### shared_mutex
The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved.

The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently.


```C++
${EXAMPLE_CORO_SHARED_MUTEX_CPP}
Expand Down
29 changes: 16 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,37 +363,39 @@ Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22
### shared_mutex
The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved.

The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently.


```C++
#include <coro/coro.hpp>
#include <iostream>

int main()
{
// Shared mutexes require a thread pool to be able to wake up multiple shared waiters when
// Shared mutexes require an excutor type to be able to wake up multiple shared waiters when
// there is an exclusive lock holder releasing the lock. This example uses a single thread
// to also show the interleaving of coroutines acquiring the shared lock in shared and
// exclusive mode as they resume and suspend in a linear manner. Ideally the thread pool
// would have more than 1 thread to resume all shared waiters in parallel.
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}};
// executor would have more than 1 thread to resume all shared waiters in parallel.
auto tp = std::make_shared<coro::thread_pool>(coro::thread_pool::options{.thread_count = 1});
coro::shared_mutex mutex{tp};

auto make_shared_task = [&](uint64_t i) -> coro::task<void> {
co_await tp.schedule();
co_await tp->schedule();
{
std::cerr << "shared task " << i << " lock_shared()\n";
auto scoped_lock = co_await mutex.lock_shared();
std::cerr << "shared task " << i << " lock_shared() acquired\n";
/// Immediately yield so the other shared tasks also acquire in shared state
/// while this task currently holds the mutex in shared state.
co_await tp.yield();
co_await tp->yield();
std::cerr << "shared task " << i << " unlock_shared()\n";
}
co_return;
};

auto make_exclusive_task = [&]() -> coro::task<void> {
co_await tp.schedule();
co_await tp->schedule();

std::cerr << "exclusive task lock()\n";
auto scoped_lock = co_await mutex.lock();
Expand Down Expand Up @@ -713,7 +715,7 @@ The example provided here shows an i/o scheduler that spins up a basic `coro::ne

int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
Expand All @@ -733,7 +735,7 @@ int main()
.on_thread_start_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; },
.on_thread_stop_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}};
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}});

auto make_server_task = [&]() -> coro::task<void> {
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
Expand All @@ -743,7 +745,7 @@ int main()
coro::net::tcp_server server{scheduler};

// Now scheduler this task onto the scheduler.
co_await scheduler.schedule();
co_await scheduler->schedule();

// Wait for an incoming connection and accept it.
auto poll_status = co_await server.poll();
Expand Down Expand Up @@ -824,7 +826,7 @@ int main()

auto make_client_task = [&]() -> coro::task<void> {
// Immediately schedule onto the scheduler.
co_await scheduler.schedule();
co_await scheduler->schedule();

// Create the tcp_client with the default settings, see tcp_client for how to set the
// ip address, port, and optionally enabling SSL/TLS.
Expand Down Expand Up @@ -878,7 +880,8 @@ All tasks that are stored within a `coro::task_container` must have a `void` ret

int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto scheduler = std::make_shared<coro::io_scheduler>(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

auto make_server_task = [&]() -> coro::task<void> {
// This is the task that will handle processing a client's requests.
Expand Down Expand Up @@ -911,7 +914,7 @@ int main()

// Spin up the tcp_server and schedule it onto the io_scheduler.
coro::net::tcp_server server{scheduler};
co_await scheduler.schedule();
co_await scheduler->schedule();

// All incoming connections will be stored into the task container until they are completed.
coro::task_container tc{scheduler};
Expand All @@ -929,7 +932,7 @@ int main()
};

auto make_client_task = [&](size_t request_count) -> coro::task<void> {
co_await scheduler.schedule();
co_await scheduler->schedule();
coro::net::tcp_client client{scheduler};

co_await client.connect();
Expand Down
8 changes: 4 additions & 4 deletions examples/coro_io_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
Expand All @@ -23,7 +23,7 @@ int main()
.on_thread_start_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; },
.on_thread_stop_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}};
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}});

auto make_server_task = [&]() -> coro::task<void> {
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
Expand All @@ -33,7 +33,7 @@ int main()
coro::net::tcp_server server{scheduler};

// Now scheduler this task onto the scheduler.
co_await scheduler.schedule();
co_await scheduler->schedule();

// Wait for an incoming connection and accept it.
auto poll_status = co_await server.poll();
Expand Down Expand Up @@ -114,7 +114,7 @@ int main()

auto make_client_task = [&]() -> coro::task<void> {
// Immediately schedule onto the scheduler.
co_await scheduler.schedule();
co_await scheduler->schedule();

// Create the tcp_client with the default settings, see tcp_client for how to set the
// ip address, port, and optionally enabling SSL/TLS.
Expand Down
12 changes: 6 additions & 6 deletions examples/coro_shared_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@

int main()
{
// Shared mutexes require a thread pool to be able to wake up multiple shared waiters when
// Shared mutexes require an excutor type to be able to wake up multiple shared waiters when
// there is an exclusive lock holder releasing the lock. This example uses a single thread
// to also show the interleaving of coroutines acquiring the shared lock in shared and
// exclusive mode as they resume and suspend in a linear manner. Ideally the thread pool
// would have more than 1 thread to resume all shared waiters in parallel.
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}};
// executor would have more than 1 thread to resume all shared waiters in parallel.
auto tp = std::make_shared<coro::thread_pool>(coro::thread_pool::options{.thread_count = 1});
coro::shared_mutex mutex{tp};

auto make_shared_task = [&](uint64_t i) -> coro::task<void> {
co_await tp.schedule();
co_await tp->schedule();
{
std::cerr << "shared task " << i << " lock_shared()\n";
auto scoped_lock = co_await mutex.lock_shared();
std::cerr << "shared task " << i << " lock_shared() acquired\n";
/// Immediately yield so the other shared tasks also acquire in shared state
/// while this task currently holds the mutex in shared state.
co_await tp.yield();
co_await tp->yield();
std::cerr << "shared task " << i << " unlock_shared()\n";
}
co_return;
};

auto make_exclusive_task = [&]() -> coro::task<void> {
co_await tp.schedule();
co_await tp->schedule();

std::cerr << "exclusive task lock()\n";
auto scoped_lock = co_await mutex.lock();
Expand Down
7 changes: 4 additions & 3 deletions examples/coro_task_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto scheduler = std::make_shared<coro::io_scheduler>(
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});

auto make_server_task = [&]() -> coro::task<void> {
// This is the task that will handle processing a client's requests.
Expand Down Expand Up @@ -36,7 +37,7 @@ int main()

// Spin up the tcp_server and schedule it onto the io_scheduler.
coro::net::tcp_server server{scheduler};
co_await scheduler.schedule();
co_await scheduler->schedule();

// All incoming connections will be stored into the task container until they are completed.
coro::task_container tc{scheduler};
Expand All @@ -54,7 +55,7 @@ int main()
};

auto make_client_task = [&](size_t request_count) -> coro::task<void> {
co_await scheduler.schedule();
co_await scheduler->schedule();
coro::net::tcp_client client{scheduler};

co_await client.connect();
Expand Down
6 changes: 2 additions & 4 deletions inc/coro/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

namespace coro
{
class thread_pool;

/**
* Event is a manully triggered thread safe signal that can be co_await()'ed by multiple awaiters.
* Each awaiter should co_await the event and upon the event being set each awaiter will have their
Expand Down Expand Up @@ -87,8 +85,8 @@ class event
auto set() noexcept -> void;

/**
* Sets this event and resumes all awaiters onto the given thread pool. This will distribute
* the waiters across the thread pools threads.
* Sets this event and resumes all awaiters onto the given executor. This will distribute
* the waiters across the executor's threads.
*/
template<concepts::executor executor_type>
auto set(executor_type& e) noexcept -> void
Expand Down
3 changes: 2 additions & 1 deletion inc/coro/io_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class io_scheduler
*/
auto schedule(coro::task<void>&& task) -> void
{
static_cast<coro::task_container<coro::io_scheduler>*>(m_owned_tasks)->start(std::move(task));
auto* ptr = static_cast<coro::task_container<coro::io_scheduler>*>(m_owned_tasks);
ptr->start(std::move(task));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions inc/coro/net/dns_resolver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class dns_result
class dns_resolver
{
public:
explicit dns_resolver(io_scheduler& scheduler, std::chrono::milliseconds timeout);
explicit dns_resolver(std::shared_ptr<io_scheduler> scheduler, std::chrono::milliseconds timeout);
dns_resolver(const dns_resolver&) = delete;
dns_resolver(dns_resolver&&) = delete;
auto operator=(const dns_resolver&) noexcept -> dns_resolver& = delete;
Expand All @@ -74,7 +74,7 @@ class dns_resolver

private:
/// The io scheduler to drive the events for dns lookups.
io_scheduler& m_io_scheduler;
std::shared_ptr<io_scheduler> m_io_scheduler;

/// The global timeout per dns lookup request.
std::chrono::milliseconds m_timeout{0};
Expand Down
17 changes: 8 additions & 9 deletions inc/coro/net/tcp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class tcp_client
* @param opts See tcp_client::options for more information.
*/
tcp_client(
io_scheduler& scheduler,
options opts = options{
std::shared_ptr<io_scheduler> scheduler,
options opts = options{
.address = {net::ip_address::from_string("127.0.0.1")}, .port = 8080, .ssl_ctx = nullptr});
tcp_client(const tcp_client&) = delete;
tcp_client(tcp_client&& other);
Expand Down Expand Up @@ -280,10 +280,10 @@ class tcp_client

/// The tcp_server creates already connected clients and provides a tcp socket pre-built.
friend tcp_server;
tcp_client(io_scheduler& scheduler, net::socket socket, options opts);
tcp_client(std::shared_ptr<io_scheduler> scheduler, net::socket socket, options opts);

/// The scheduler that will drive this tcp client.
io_scheduler* m_io_scheduler{nullptr};
std::shared_ptr<io_scheduler> m_io_scheduler{nullptr};
/// Options for what server to connect to.
options m_options{};
/// The tcp socket.
Expand All @@ -293,12 +293,11 @@ class tcp_client
/// SSL/TLS specific information if m_options.ssl_ctx != nullptr.
ssl_info m_ssl_info{};

private:
static auto ssl_shutdown_and_free(
io_scheduler& io_scheduler,
net::socket s,
ssl_unique_ptr ssl_ptr,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<void>;
std::shared_ptr<io_scheduler> io_scheduler,
net::socket s,
ssl_unique_ptr ssl_ptr,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<void>;
};

} // namespace coro::net
6 changes: 3 additions & 3 deletions inc/coro/net/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class tcp_server
};

tcp_server(
io_scheduler& scheduler,
options opts = options{
std::shared_ptr<io_scheduler> scheduler,
options opts = options{
.address = net::ip_address::from_string("0.0.0.0"), .port = 8080, .backlog = 128, .ssl_ctx = nullptr});

tcp_server(const tcp_server&) = delete;
Expand Down Expand Up @@ -64,7 +64,7 @@ class tcp_server

private:
/// The io scheduler for awaiting new connections.
io_scheduler* m_io_scheduler{nullptr};
std::shared_ptr<io_scheduler> m_io_scheduler{nullptr};
/// The bind and listen options for this server.
options m_options;
/// The socket for accepting new tcp connections on.
Expand Down
8 changes: 4 additions & 4 deletions inc/coro/net/udp_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class udp_peer
* Creates a udp peer that can send packets but not receive them. This udp peer will not explicitly
* bind to a local ip+port.
*/
explicit udp_peer(io_scheduler& scheduler, net::domain_t domain = net::domain_t::ipv4);
explicit udp_peer(std::shared_ptr<io_scheduler> scheduler, net::domain_t domain = net::domain_t::ipv4);

/**
* Creates a udp peer that can send and receive packets. This peer will bind to the given ip_port.
*/
explicit udp_peer(io_scheduler& scheduler, const info& bind_info);
explicit udp_peer(std::shared_ptr<io_scheduler> scheduler, const info& bind_info);

udp_peer(const udp_peer&) = delete;
udp_peer(udp_peer&&) = default;
Expand All @@ -58,7 +58,7 @@ class udp_peer
auto poll(poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<coro::poll_status>
{
co_return co_await m_io_scheduler.poll(m_socket, op, timeout);
co_return co_await m_io_scheduler->poll(m_socket, op, timeout);
}

/**
Expand Down Expand Up @@ -137,7 +137,7 @@ class udp_peer

private:
/// The scheduler that will drive this udp client.
io_scheduler& m_io_scheduler;
std::shared_ptr<io_scheduler> m_io_scheduler;
/// The udp socket.
net::socket m_socket{-1};
/// Did the user request this udp socket is bound locally to receive packets?
Expand Down
Loading