Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revive for io_uring support on Linux #216

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ WinAPIEventDriver | — | yes | — | — | — | —
KqueueEventDriver | — | — | yes | yes¹ | — | yes
CFRunloopEventDriver | — | — | yes | — | — | yes
LibasyncEventDriver | —¹| —¹| —¹| —¹| — | —
UringEventDriver | —¹| no | no | no | unknown | no

¹ planned, but not currenly implemented

Expand All @@ -40,20 +41,20 @@ The following compilers are tested and supported:
Driver development status
-------------------------

Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | CFRunloop | Libasync
----------------------|--------|-------|---------|---------|-----------|----------
TCP Sockets | yes | yes | yes | yes | yes | —
UDP Sockets | yes | yes | yes | yes | yes | —
USDS | yes | yes | — | yes | yes | —
DNS | yes | yes | yes | yes | yes | —
Timers | yes | yes | yes | yes | yes | —
Events | yes | yes | yes | yes | yes | —
Unix Signals | yes² | yes | — | — | — | —
Files | yes | yes | yes | yes | yes | —
UI Integration | yes¹ | yes¹ | yes | yes¹ | yes¹ | —
File watcher | yes² | yes | yes | yes² | yes² | —
Pipes | yes | yes | — | yes | yes | —
Processes | yes | yes | — | yes | yes | —
Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | CFRunloop | Libasync | Uring
----------------------|--------|-------|---------|---------|-----------|----------|-------
TCP Sockets | yes | yes | yes | yes | yes | — | —
UDP Sockets | yes | yes | yes | yes | yes | — | —
USDS | yes | yes | — | yes | yes | — | —
DNS | yes | yes | yes | yes | yes | — | —
Timers | yes | yes | yes | yes | yes | — | —
Events | yes | yes | yes | yes | yes | — | —
Unix Signals | yes² | yes | — | — | — | — | —
Files | yes | yes | yes | yes | yes | — | yes
UI Integration | yes¹ | yes¹ | yes | yes¹ | yes¹ | — | yes?
File watcher | yes² | yes | yes | yes² | yes² | — | —
Pipes | yes | yes | — | yes | yes | — | —
Processes | yes | yes | — | yes | yes | — | —

¹ Manually, by adopting the X11 display connection socket

Expand Down
7 changes: 6 additions & 1 deletion dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name "eventcore"
description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities."
license "MIT"
copyright "Copyright © 2016-2018 Sönke Ludwig"

dependency "during" version="~>0.3.0"
targetType "library"

libs "ws2_32" "user32" platform="windows-dmd"
Expand Down Expand Up @@ -49,6 +49,11 @@ configuration "libasync" {
versions "EventcoreLibasyncDriver"
}

configuration "uring" {
platforms "linux"
versions "EventcoreEpollDriver" "EventcoreEpollUsesUring"
}

configuration "generic" {
// Defines eventDriver as the generic EventDriver interface. Setup must be done manually.
}
Expand Down
44 changes: 35 additions & 9 deletions source/eventcore/drivers/posix/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.posix.io_uring.io_uring;
import eventcore.drivers.posix.io_uring.files;
import eventcore.drivers.timer;
import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue;
Expand All @@ -37,12 +39,17 @@ private long currStdTime()
return Clock.currStdTime;
}

version(EventcoreEpollUsesUring) {
version (EventcoreEpollDriver) {}
else { static assert(false, "uring only supported together with epoll for now"); }
}

final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
@safe: /*@nogc:*/ nothrow:


private {
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver);
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver, UringCore);
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop;
Expand All @@ -51,16 +58,20 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!(EventsDriver, CoreDriver);
version (EventcoreEpollUsesUring) alias FileDriver = UringDriverFiles;
else alias FileDriver = ThreadedFileEventDriver!(EventsDriver, CoreDriver);
version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
else alias PipeDriver = DummyEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
else version (EventcoreCFRunLoopDriver) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop;
else alias ProcessDriver = DummyEventDriverProcesses!Loop;
version (EventcoreEpollUsesUring) alias UringCore = UringEventLoop;
else alias UringCore = NoRing;

Loop m_loop;
UringCore m_uring;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
Expand All @@ -76,15 +87,17 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
this()
@nogc @trusted {
m_loop = mallocT!Loop;
m_uring = mallocT!UringCore;
m_sockets = mallocT!SocketsDriver(m_loop);
m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver;
m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, this);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes, m_uring);
m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events, m_core);
version (EventcoreEpollUsesUring) m_files = mallocT!FileDriver(m_uring);
else m_files = mallocT!FileDriver(m_events, m_core);
m_watchers = mallocT!WatcherDriver(m_events);
}

Expand Down Expand Up @@ -171,7 +184,8 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
}


final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore {
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses, Uring)
: EventDriverCore {
@safe nothrow:
import core.atomic : atomicLoad, atomicStore;
import core.sync.mutex : Mutex;
Expand All @@ -188,14 +202,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Timers m_timers;
Events m_events;
Processes m_processes;
Uring m_uring;
bool m_exit = false;
EventID m_wakeupEvent;

shared Mutex m_threadCallbackMutex;
ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
}

this(Loop loop, Timers timers, Events events, Processes processes)
this(Loop loop, Timers timers, Events events, Processes processes,
Uring uring)
@nogc {
m_loop = loop;
m_timers = timers;
Expand All @@ -205,6 +221,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_threadCallbackMutex = mallocT!(shared(Mutex));
m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
m_threadCallbacks.reserve(1000);
m_uring = uring;
m_uring.registerEventID(m_wakeupEvent);
}

final void dispose()
Expand All @@ -218,7 +236,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg);
}

@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
@property size_t waiterCount() const {
return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount + m_uring.waiterCount;
}

@property Loop loop() { return m_loop; }

Expand All @@ -236,12 +256,17 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
if (!waiterCount) {
return ExitReason.outOfWaiters;
}

version (EventcoreEpollUsesUring) {
// this is required to make the kernel aware of
// submitted SEQ, otherwise the first call to
// process events could stall
m_uring.submit;
}
bool got_events;

if (timeout <= 0.seconds) {
got_events = m_loop.doProcessEvents(0.seconds);
m_timers.process(MonoTime.currTime);
version (EventcoreEpollUsesUring) got_events |= m_uring.doProcessEvents(0.seconds);
} else {
auto now = MonoTime.currTime;
do {
Expand All @@ -250,6 +275,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
auto prev_step = now;
now = MonoTime.currTime;
got_events |= m_timers.process(now);
version(EventcoreEpollUsesUring) got_events |= m_uring.doProcessEvents(0.seconds);
if (timeout != Duration.max)
timeout -= now - prev_step;
} while (timeout > 0.seconds && !m_exit && !got_events);
Expand Down
Loading
Loading