Skip to content

Commit

Permalink
posix: Make epoll use sender/receiver, not then()
Browse files Browse the repository at this point in the history
  • Loading branch information
avdgrinten committed May 3, 2021
1 parent 701ca54 commit b3e0647
Showing 1 changed file with 52 additions and 17 deletions.
69 changes: 52 additions & 17 deletions posix/subsystem/src/epoll.cpp
Expand Up @@ -4,6 +4,7 @@

#include <async/recurring-event.hpp>
#include <boost/intrusive/list.hpp>
#include <frg/manual_box.hpp>
#include <helix/ipc.hpp>
#include "common.hpp"
#include "epoll.hpp"
Expand All @@ -26,6 +27,21 @@ struct OpenFile : File {
static constexpr State statePolling = 2;
static constexpr State statePending = 4;

struct Item;

struct Receiver {
void set_value_inline(frg::expected<Error, PollWaitResult> outcome) {
item->pollOutcome.emplace(std::move(outcome));
}

void set_value_noinline(frg::expected<Error, PollWaitResult> outcome) {
item->pollOutcome.emplace(std::move(outcome));
_awaitPoll(item);
}

Item *item;
};

struct Item : boost::intrusive::list_base_hook<> {
Item(smarter::shared_ptr<OpenFile> epoll, Process *process,
smarter::shared_ptr<File> file, int mask, uint64_t cookie)
Expand All @@ -42,18 +58,25 @@ struct OpenFile : File {
uint64_t cookie;

async::cancellation_event cancelPoll;
async::result<frg::expected<Error, PollWaitResult>> pollFuture;

frg::manual_box<
async::execution::operation_t<
async::result<frg::expected<Error, PollWaitResult>>,
Receiver
>
> pollOperation;

std::optional<frg::expected<Error, PollWaitResult>> pollOutcome;
};

static void _awaitPoll(Item *item) {
reRunImmediately:
// First, destruct the operation so that we can re-use it later.
item->pollOperation.destruct();

assert(item->state & statePolling);
auto self = item->epoll.get();

// Release the future to free up memory.
assert(item->pollFuture.ready());
auto result_or_error = std::move(item->pollFuture.value());
item->pollFuture = async::result<frg::expected<Error, PollWaitResult>>{};

// Discard non-active and closed items.
if(!(item->state & stateActive)) {
item->state &= ~statePolling;
Expand All @@ -63,8 +86,10 @@ struct OpenFile : File {
return;
}

if(!result_or_error) {
assert(result_or_error.error() == Error::fileClosed);
auto resultOrError = std::move(*item->pollOutcome);

if(!resultOrError) {
assert(resultOrError.error() == Error::fileClosed);
item->state &= ~statePolling;
if(!item->state)
delete item;
Expand All @@ -74,7 +99,7 @@ struct OpenFile : File {
// Note that items only become pending if there is an edge.
// This is the correct behavior for edge-triggered items.
// Level-triggered items stay pending until the event disappears.
auto result = result_or_error.value();
auto result = resultOrError.value();
if(std::get<1>(result) & (item->eventMask | EPOLLERR | EPOLLHUP)) {
if(logEpoll)
std::cout << "posix.epoll \e[1;34m" << item->epoll->structName() << "\e[0m"
Expand All @@ -101,11 +126,17 @@ struct OpenFile : File {
<< " Mask is " << item->eventMask << ", while edges are "
<< std::get<1>(result) << std::endl;
item->cancelPoll.reset();
item->pollFuture = item->file->pollWait(item->process, std::get<0>(result),
item->eventMask | EPOLLERR | EPOLLHUP, item->cancelPoll);
item->pollFuture.then([item] {
_awaitPoll(item);
item->pollOperation.construct_with([&] {
return async::execution::connect(
item->file->pollWait(item->process, std::get<0>(result),
item->eventMask | EPOLLERR | EPOLLHUP, item->cancelPoll),
Receiver{item}
);
});
// Poll should not return immediately; we use an ugly goto here in favor of wrapping
// the entire function in a loop.
if(async::execution::start_inline(*item->pollOperation))
goto reRunImmediately;
}
}

Expand Down Expand Up @@ -250,11 +281,15 @@ struct OpenFile : File {

// Once an item is not pending anymore, we continue watching it.
item->cancelPoll.reset();
item->pollFuture = item->file->pollWait(item->process, std::get<0>(result),
item->eventMask | EPOLLERR | EPOLLHUP, item->cancelPoll);
item->pollFuture.then([item] {
_awaitPoll(item);
item->pollOperation.construct_with([&] {
return async::execution::connect(
item->file->pollWait(item->process, std::get<0>(result),
item->eventMask | EPOLLERR | EPOLLHUP, item->cancelPoll),
Receiver{item}
);
});
if(async::execution::start_inline(*item->pollOperation))
_awaitPoll(item);
}
continue;
}
Expand Down

0 comments on commit b3e0647

Please sign in to comment.