From a9ab528e348a3f07dfaffcdcb1031062b1bfe8bb Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 17 Jul 2018 15:17:06 -0400 Subject: [PATCH] std.event.Loop.onNextTick dispatches work to waiting threads --- std/atomic/queue.zig | 14 +++ std/event/loop.zig | 205 +++++++++++++++++++++---------------------- 2 files changed, 113 insertions(+), 106 deletions(-) diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index 1fd07714e8fb..df31c88d2a1e 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -51,6 +51,20 @@ pub fn Queue(comptime T: type) type { return head; } + pub fn unget(self: *Self, node: *Node) void { + while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} + defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + + const opt_head = self.head; + self.head = node; + if (opt_head) |head| { + head.next = node; + } else { + assert(self.tail == null); + self.tail = node; + } + } + pub fn isEmpty(self: *Self) bool { return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null; } diff --git a/std/event/loop.zig b/std/event/loop.zig index fc927592b996..e2bc9e7eb93e 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -12,7 +12,6 @@ pub const Loop = struct { next_tick_queue: std.atomic.Queue(promise), os_data: OsData, final_resume_node: ResumeNode, - dispatch_lock: u8, // TODO make this a bool pending_event_count: usize, extra_threads: []*std.os.Thread, @@ -74,11 +73,10 @@ pub const Loop = struct { /// max(thread_count - 1, 0) fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { self.* = Loop{ - .pending_event_count = 0, + .pending_event_count = 1, .allocator = allocator, .os_data = undefined, .next_tick_queue = std.atomic.Queue(promise).init(), - .dispatch_lock = 1, // start locked so threads go directly into epoll wait .extra_threads = undefined, .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), .eventfd_resume_nodes = undefined, @@ -306,7 +304,7 @@ pub const Loop = struct { pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void { _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); errdefer { - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + self.finishOneEvent(); } try self.modFd( fd, @@ -326,7 +324,7 @@ pub const Loop = struct { pub fn removeFd(self: *Loop, fd: i32) void { self.removeFdNoCounter(fd); - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + self.finishOneEvent(); } fn removeFdNoCounter(self: *Loop, fd: i32) void { @@ -345,14 +343,70 @@ pub const Loop = struct { } } + fn dispatch(self: *Loop) void { + while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const next_tick_node = self.next_tick_queue.get() orelse { + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = next_tick_node.data; + switch (builtin.os) { + builtin.Os.macosx => { + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + builtin.Os.linux => { + // the pending count is already accounted for + const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | + std.os.linux.EPOLLET; + self.modFd( + eventfd_node.eventfd, + eventfd_node.epoll_op, + epoll_events, + &eventfd_node.base, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + builtin.Os.windows => { + // this value is never dereferenced but we need it to be non-null so that + // the consumer code can decide whether to read the completion key. + // it has to do this for normal I/O, so we match that behavior here. + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus( + self.os_data.io_port, + undefined, + eventfd_node.completion_key, + overlapped, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + else => @compileError("unsupported OS"), + } + } + } + /// Bring your own linked list node. This means it can't fail. pub fn onNextTick(self: *Loop, node: *NextTickNode) void { _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); self.next_tick_queue.put(node); + self.dispatch(); } pub fn run(self: *Loop) void { - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.finishOneEvent(); // the reference we start with + self.workerRun(); for (self.extra_threads) |extra_thread| { extra_thread.wait(); @@ -396,106 +450,45 @@ pub const Loop = struct { } } - fn workerRun(self: *Loop) void { - start_over: while (true) { - if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) { - while (self.next_tick_queue.get()) |next_tick_node| { - const handle = next_tick_node.data; - if (self.next_tick_queue.isEmpty()) { - // last node, just resume it - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - resume handle; - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - continue :start_over; - } - - // non-last node, stick it in the epoll/kqueue set so that - // other threads can get to it - if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { - const eventfd_node = &resume_stack_node.data; - eventfd_node.base.handle = handle; - switch (builtin.os) { - builtin.Os.macosx => { - const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); - const eventlist = ([*]posix.Kevent)(undefined)[0..0]; - _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { - // fine, we didn't need it anyway - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - self.available_eventfd_resume_nodes.push(resume_stack_node); - resume handle; - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - continue :start_over; - }; - }, - builtin.Os.linux => { - // the pending count is already accounted for - const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET; - self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch { - // fine, we didn't need it anyway - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - self.available_eventfd_resume_nodes.push(resume_stack_node); - resume handle; - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - continue :start_over; - }; - }, - builtin.Os.windows => { - // this value is never dereferenced but we need it to be non-null so that - // the consumer code can decide whether to read the completion key. - // it has to do this for normal I/O, so we match that behavior here. - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch { - // fine, we didn't need it anyway - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - self.available_eventfd_resume_nodes.push(resume_stack_node); - resume handle; - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - continue :start_over; - }; - }, - else => @compileError("unsupported OS"), + fn finishOneEvent(self: *Loop) void { + if (@atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) == 1) { + // cause all the threads to stop + switch (builtin.os) { + builtin.Os.linux => { + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + }, + builtin.Os.macosx => { + const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; + return; + }, + builtin.Os.windows => { + var i: usize = 0; + while (i < self.os_data.extra_thread_count) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; } - } else { - // threads are too busy, can't add another eventfd to wake one up - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - resume handle; - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - continue :start_over; } - } - - const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst); - if (pending_event_count == 0) { - // cause all the threads to stop - switch (builtin.os) { - builtin.Os.linux => { - // writing 8 bytes to an eventfd cannot fail - std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; - return; - }, - builtin.Os.macosx => { - const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); - const eventlist = ([*]posix.Kevent)(undefined)[0..0]; - // cannot fail because we already added it and this just enables it - _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; - return; - }, - builtin.Os.windows => { - var i: usize = 0; - while (i < self.os_data.extra_thread_count) : (i += 1) { - while (true) { - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; - break; - } - } - return; - }, - else => @compileError("unsupported OS"), - } - } + return; + }, + else => @compileError("unsupported OS"), + } + } + } - _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + fn workerRun(self: *Loop) void { + while (true) { + while (true) { + const next_tick_node = self.next_tick_queue.get() orelse break; + self.dispatch(); + resume next_tick_node.data; + self.finishOneEvent(); } switch (builtin.os) { @@ -519,7 +512,7 @@ pub const Loop = struct { } resume handle; if (resume_node_id == ResumeNode.Id.EventFd) { - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + self.finishOneEvent(); } } }, @@ -541,7 +534,7 @@ pub const Loop = struct { } resume handle; if (resume_node_id == ResumeNode.Id.EventFd) { - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + self.finishOneEvent(); } } }, @@ -570,7 +563,7 @@ pub const Loop = struct { } resume handle; if (resume_node_id == ResumeNode.Id.EventFd) { - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + self.finishOneEvent(); } }, else => @compileError("unsupported OS"),