From 9bce97a479e70f2d8e09047c9a0c93690cd8fd99 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 10:22:16 -0600 Subject: [PATCH 1/6] Start on ResetEvent --- lib/std/c.zig | 1 + lib/std/reset_event.zig | 340 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+) create mode 100644 lib/std/reset_event.zig diff --git a/lib/std/c.zig b/lib/std/c.zig index 19c3c8feb84e..9e433598525a 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -220,6 +220,7 @@ pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int; pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{}; pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int; +pub extern "c" fn pthread_cond_timedwait(noalias: cond: *pthread_cond_t, noalias: mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int; pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int; diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig new file mode 100644 index 000000000000..35eab6ac13cb --- /dev/null +++ b/lib/std/reset_event.zig @@ -0,0 +1,340 @@ +const std = @import("std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const assert = std.debug.assert; +const Backoff = std.SpinLock.Backoff; +const c = std.c; +const time = std.time; +const linux = std.os.linux; +const windows = std.os.windows; + +/// A resource object which supports blocking until signaled. +/// Once finished, the `deinit()` method should be called for correctness. +pub const ResetEvent = struct { + os_event: OsEvent, + + pub fn init() ResetEvent { + return ResetEvent{ .os_event = OsEvent.init() }; + } + + pub fn deinit(self: *ResetEvent) void { + self.os_event.deinit(); + self.* = undefined; + } + + /// Returns whether or not the event is currenetly set + pub fn isSet(self: *const ResetEvent) bool { + return self.os_event.isSet(); + } + + /// Sets the event if not already set and + /// wakes up AT LEAST one thread waiting the event. + /// Returns whether or not a thread was woken up. + pub fn set(self: *ResetEvent) bool { + return self.os_event.set(); + } + + /// Resets the event to its original, unset state. + /// Returns whether or not the event was currently set before un-setting. + pub fn reset(self: *ResetEvent) bool { + return self.os_event.reset(); + } + + const WaitError = error{ + /// The thread blocked longer than the maximum time specified. + TimedOut, + }; + + /// Wait for the event to be set by blocking the current thread. + /// Optionally provided timeout in nanoseconds which throws an + /// `error.TimedOut` if the thread blocked AT LEAST longer than specified. + /// Returns whether or not the thread blocked from the event being unset at the time of calling. + pub fn wait(self: *ResetEvent, timeout_ns: ?u64) WaitError!bool { + return self.os_event.wait(timeout_ns); + } +}; + +const OsEvent = if (builtin.single_threaded) DebugEvent else switch (builtin.os) { + .windows => WindowsEvent, + .linux => if (builtin.link_libc) PosixEvent else LinuxEvent, + else => if (builtin.link_libc) PosixEvent else SpinEvent, +}; + +const DebugEvent = struct { + is_set: @typeOf(set_init), + + const set_init = if (std.debug.runtime_safety) false else {}; + + pub fn init() DebugEvent { + return DebugEvent{ .is_set = set_init }; + } + + pub fn deinit(self: *DebugEvent) void { + self.* = undefined; + } + + pub fn isSet(self: *const DebugEvent) bool { + if (!std.debug.runtime_safety) + return true; + return self.is_set; + } + + pub fn set(self: *DebugEvent) bool { + if (std.debug.runtime_safety) + self.is_set = true; + return false; + } + + pub fn reset(self: *DebugEvent) bool { + if (!std.debug.runtime_safety) + return false; + const was_set = self.is_set; + self.is_set = false; + return was_set; + } + + pub fn wait(self: *DebugEvent, timeout: ?u64) ResetEvent.WaitError!bool { + if (std.debug.runtime_safety and !self.is_set) + @panic("deadlock detected"); + return ResetEvent.WaitError.TimedOut; + } +}; + +fn EventState(comptime TagType: type) type { + return enum(TagType) { + Empty, + Waiting, + Signaled, + }; +} + +const SpinEvent = struct { + state: State, + + const State = EventState(u8); + + pub fn init() SpinEvent { + return SpinEvent{ .state = .Empty }; + } + + pub fn deinit(self: *SpinEvent) void { + self.* = undefined; + } + + pub fn isSet(self: *const SpinEvent) bool { + return @atomicLoad(State, &self.state, .Acquire) == .Signaled; + } + + pub fn set(self: *SpinEvent) bool { + return @atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting; + } + + pub fn reset(self: *SpinEvent) bool { + return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; + } + + pub fn wait(self: *SpinEvent, timeout: ?u64) ResetEvent.WaitError!bool { + var state = @atomicLoad(State, &self.state, .Monotonic); + while (true) { + switch (state) { + .Empty => state = @cmpxchgWeak(State, &self.state, state, .Waiting, .Acquire, .Monotonic) orelse break, + .Waiting => break, + .Signaled => return false, + } + } + + // TODO: handle case for time.Timer.start() fails + var spin = Backoff.init(); + var timer = if (timeout == null) null else time.Timer.start() catch unreachable; + while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { + spin.yield(); + if (timeout) |timeout_ns| { + if (timer.?.read() > timeout_ns) + return ResetEvent.WaitError.TimedOut; + } + } + return true; + } +}; + +const LinuxEvent = struct { + state: State, + + const State = EventState(i32); + + pub fn init() LinuxEvent { + return LinuxEvent{ .state = .Empty }; + } + + pub fn deinit(self: *LinuxEvent) void { + self.* = undefined; + } + + pub fn isSet(self: *const LinuxEvent) bool { + return @atomicLoad(State, &self.state, .Acquire) == .Signaled; + } + + pub fn set(self: *LinuxEvent) bool { + if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) + return false; + const rc = linux.futex_wake(@ptrCast(*const i32, &self.state), linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); + assert(linux.getErrno(rc) == 0); + return true; + } + + pub fn reset(self: *LinuxEvent) bool { + return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; + } + + pub fn wait(self: *LinuxEvent, timeout: ?u64) ResetEvent.WaitError!bool { + var state = @atomicLoad(State, &self.state, .Monotonic); + while (true) { + switch (state) { + .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, + .Waiting => break, + .Signaled => return false, + } + } + + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { + const rc = linux.futex_wait(@ptrCast(*const i32, &self.state), linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, @enumToInt(State.Waiting), ts_ptr); + switch (linux.getErrno(rc)) { + 0, linux.EINTR => continue, + linux.EAGAIN => break, + linux.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, + else => unreachable, + } + } + } +}; + +const PosixEvent = struct { + state: State, + cond: c.pthread_cond_t, + mutex: c.pthread_mutex_t, + + const State = EventState(u8); + + pub fn init() PosixEvent { + return PosixEvent{ + .state = .Empty, + .cond = c.PTHREAD_COND_INITIALIZER, + .mutex = c.PTHREAD_MUTEX_INITIALIZER, + }; + } + + pub fn deinit(self: *PosixEvent) void { + // On dragonfly, the destroy functions return EINVAL if they were initialized statically. + const retm = c.pthread_mutex_destroy(&self.mutex); + assert(retm == 0 or retm == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); + const retc = c.pthread_cond_destroy(&self.cond); + assert(retc == 0 or retc == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); + self.* = undefined; + } + + pub fn isSet(self: *const PosixEvent) bool { + assert(c.pthread_mutex_lock(&self.mutex) == 0); + defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + + return self.state == .Signaled; + } + + pub fn set(self: *PosixEvent) bool { + assert(c.pthread_mutex_lock(&self.mutex) == 0); + defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + + const woken = self.state == .Waiting; + self.state = .Signaled; + return woken; + } + + pub fn reset(self: *PosixEvent) bool { + assert(c.pthread_mutex_lock(&self.mutex) == 0); + defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + + const was_set = self.state == .Signaled; + self.state = .Empty; + return was_set; + } + + pub fn wait(self: *PosixEvent, timeout: ?u64) ResetEvent.WaitError!bool { + assert(c.pthread_mutex_lock(&self.mutex) == 0); + defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + + if (self.state == .Signaled) + return false; + + var ts: std.os.timespec = undefined; + var ts_ptr = &ts; + if (timeout) |timeout_ns| { + var tv: std.os.timeval = undefined; + assert(c.gettimeofday(&tv, null) == 0); + ts.tv_sec = @intCast(isize, tv.tv_sec + (timeout_ns / time.ns_per_s)); + ts.tv_nsec = @intCast(isize, (tv.tv_usec * time.microsecond) + (timeout_ns % time.ns_per_s)); + } + + self.state = .Waiting; + while (self.state == .Waiting) { + const rc = switch (timeout == null) { + true => c.pthread_cond_wait(&self.cond, &self.mutex), + else => c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ptr), + }; + assert(rc == 0); + } + } +}; + +const WindowsEvent = struct { + state: State, + + const State = EventState(u32); + + pub fn init() WindowsEvent { + return WindowsEvent{ .state = .Empty }; + } + + pub fn deinit(self: *WindowsEvent) void { + self.* = undefined; + } + + pub fn isSet(self: *const WindowsEvent) bool { + return @atomicLoad(State, &self.state, .Acquire) == .Signaled; + } + + pub fn set(self: *WindowsEvent) bool { + if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) + return false; + + if (getEventHandle()) |handle| { + const key = @ptrCast(*const c_void, &self.state); + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == 0); + } + return true; + } + + pub fn reset(self: *WindowsEvent) bool { + return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; + } + + pub fn wait(self: *WindowsEvent, timeout: ?u64) ResetEvent.WaitError!bool { + var state = @atomicLoad(State, &self.state, .Monotonic); + while (true) { + switch (state) { + .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, + .Waiting => break, + .Signaled => return false, + } + } + + const timeout_ms = if (timeout @intCast(windows.LARGE_INTEGER, ) + } +}; From ef208fee3cf7eb25ae08e1a896eba58b91e56d50 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 14:04:31 -0600 Subject: [PATCH 2/6] Definition fixups & ResetEvent test cases --- lib/std/c.zig | 2 +- lib/std/reset_event.zig | 386 ++++++++++++++++++++++++---------------- 2 files changed, 237 insertions(+), 151 deletions(-) diff --git a/lib/std/c.zig b/lib/std/c.zig index 9e433598525a..9e70ff988d7a 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -220,7 +220,7 @@ pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int; pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{}; pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int; -pub extern "c" fn pthread_cond_timedwait(noalias: cond: *pthread_cond_t, noalias: mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int; +pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int; pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int; diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 35eab6ac13cb..59399d5e783b 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -4,9 +4,10 @@ const testing = std.testing; const assert = std.debug.assert; const Backoff = std.SpinLock.Backoff; const c = std.c; +const os = std.os; const time = std.time; -const linux = std.os.linux; -const windows = std.os.windows; +const linux = os.linux; +const windows = os.windows; /// A resource object which supports blocking until signaled. /// Once finished, the `deinit()` method should be called for correctness. @@ -23,15 +24,15 @@ pub const ResetEvent = struct { } /// Returns whether or not the event is currenetly set - pub fn isSet(self: *const ResetEvent) bool { + pub fn isSet(self: *ResetEvent) bool { return self.os_event.isSet(); } /// Sets the event if not already set and /// wakes up AT LEAST one thread waiting the event. /// Returns whether or not a thread was woken up. - pub fn set(self: *ResetEvent) bool { - return self.os_event.set(); + pub fn set(self: *ResetEvent, auto_reset: bool) bool { + return self.os_event.set(auto_reset); } /// Resets the event to its original, unset state. @@ -73,15 +74,15 @@ const DebugEvent = struct { self.* = undefined; } - pub fn isSet(self: *const DebugEvent) bool { + pub fn isSet(self: *DebugEvent) bool { if (!std.debug.runtime_safety) return true; return self.is_set; } - pub fn set(self: *DebugEvent) bool { + pub fn set(self: *DebugEvent, auto_reset: bool) bool { if (std.debug.runtime_safety) - self.is_set = true; + self.is_set = !auto_reset; return false; } @@ -100,102 +101,87 @@ const DebugEvent = struct { } }; -fn EventState(comptime TagType: type) type { - return enum(TagType) { - Empty, - Waiting, - Signaled, - }; -} +fn AtomicEvent(comptime FutexImpl: type) type { + return struct { + state: u32, -const SpinEvent = struct { - state: State, + const IS_SET: u32 = 1 << 0; + const WAIT_MASK = ~IS_SET; - const State = EventState(u8); + pub const Self = @This(); + pub const Futex = FutexImpl; - pub fn init() SpinEvent { - return SpinEvent{ .state = .Empty }; - } + pub fn init() Self { + return Self{ .state = 0 }; + } - pub fn deinit(self: *SpinEvent) void { - self.* = undefined; - } + pub fn deinit(self: *Self) void { + self.* = undefined; + } - pub fn isSet(self: *const SpinEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; - } + pub fn isSet(self: *const Self) bool { + const state = @atomicLoad(u32, &self.state, .Acquire); + return (state & IS_SET) != 0; + } - pub fn set(self: *SpinEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting; - } + pub fn reset(self: *Self) bool { + const old_state = @atomicRmw(u32, &self.state, .Xchg, 0, .Monotonic); + return (old_state & IS_SET) != 0; + } - pub fn reset(self: *SpinEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } + pub fn set(self: *Self, auto_reset: bool) bool { + const new_state = if (auto_reset) 0 else IS_SET; + const old_state = @atomicRmw(u32, &self.state, .Xchg, new_state, .Release); + if ((old_state & WAIT_MASK) == 0) { + return false; + } - pub fn wait(self: *SpinEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, state, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, + Futex.wake(&self.state); + return true; + } + + pub fn wait(self: *Self, timeout: ?u64) ResetEvent.WaitError!bool { + var dummy_value: u32 = undefined; + const wait_token = @truncate(u32, @ptrToInt(&dummy_value)); + + var state = @atomicLoad(u32, &self.state, .Monotonic); + while (true) { + if ((state & IS_SET) != 0) + return false; + state = @cmpxchgWeak(u32, &self.state, state, wait_token, .Acquire, .Monotonic) orelse break; } + + try Futex.wait(&self.state, wait_token, timeout); + return true; } + }; +} + +const SpinEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void {} - // TODO: handle case for time.Timer.start() fails + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { + // TODO: handle platforms where time.Timer.start() fails var spin = Backoff.init(); var timer = if (timeout == null) null else time.Timer.start() catch unreachable; - while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { + while (@atomicLoad(u32, ptr, .Acquire) == expected) { spin.yield(); if (timeout) |timeout_ns| { if (timer.?.read() > timeout_ns) return ResetEvent.WaitError.TimedOut; } } - return true; - } -}; - -const LinuxEvent = struct { - state: State, - - const State = EventState(i32); - - pub fn init() LinuxEvent { - return LinuxEvent{ .state = .Empty }; - } - - pub fn deinit(self: *LinuxEvent) void { - self.* = undefined; - } - - pub fn isSet(self: *const LinuxEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; } +}); - pub fn set(self: *LinuxEvent) bool { - if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) - return false; - const rc = linux.futex_wake(@ptrCast(*const i32, &self.state), linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); +const LinuxEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void { + const key = @ptrCast(*const i32, ptr); + const rc = linux.futex_wake(key, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); assert(linux.getErrno(rc) == 0); - return true; } - pub fn reset(self: *LinuxEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } - - pub fn wait(self: *LinuxEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, - } - } - + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { var ts: linux.timespec = undefined; var ts_ptr: ?*linux.timespec = null; if (timeout) |timeout_ns| { @@ -204,28 +190,94 @@ const LinuxEvent = struct { ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); } - while (@atomicLoad(State, &self.state, .Monotonic) == .Waiting) { - const rc = linux.futex_wait(@ptrCast(*const i32, &self.state), linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, @enumToInt(State.Waiting), ts_ptr); + const key = @ptrCast(*const i32, ptr); + const key_expect = @bitCast(i32, expected); + while (@atomicLoad(i32, key, .Acquire) == key_expect) { + const rc = linux.futex_wait(key, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, key_expect, ts_ptr); switch (linux.getErrno(rc)) { - 0, linux.EINTR => continue, - linux.EAGAIN => break, + 0, linux.EAGAIN => break, + linux.EINTR => continue, linux.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, else => unreachable, } } } -}; +}); + +const WindowsEvent = AtomicEvent(struct { + fn wake(ptr: *const u32) void { + if (getEventHandle()) |handle| { + const key = @ptrCast(*const c_void, ptr); + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == 0); + } + } + + fn wait(ptr: *const u32, expected: u32, timeout: ?u64) ResetEvent.WaitError!void { + // fallback to spinlock if NT Keyed Events arent available + const handle = getEventHandle() orelse { + return SpinEvent.Futex.wait(ptr, expected, timeout); + }; + + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = @intCast(windows.LARGE_INTEGER, @divFloor(timeout_ns, time.millisecond)); + } + + const key = @ptrCast(*const c_void, ptr); + while (@atomicLoad(u32, ptr, .Acquire) == expected) { + const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + assert(rc == 0); + } + } + + var keyed_state = State.Uninitialized; + var keyed_handle: ?windows.HANDLE = null; + + const State = enum(u8) { + Uninitialized, + Intializing, + Initialized, + }; + + fn getEventHandle() ?windows.HANDLE { + var spin = Backoff.init(); + var state = @atomicLoad(State, &keyed_state, .Monotonic); + + while (true) { + switch (state) { + .Initialized => { + return keyed_handle; + }, + .Intializing => { + spin.yield(); + state = @atomicLoad(State, &keyed_state, .Acquire); + }, + .Uninitialized => state = @cmpxchgWeak(State, &keyed_state, state, .Intializing, .Acquire, .Monotonic) orelse { + var handle: windows.HANDLE = undefined; + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(&handle, access_mask, null, 0) == 0) + keyed_handle = handle; + @atomicStore(State, &keyed_state, .Initialized, .Release); + return keyed_handle; + }, + } + } + } +}); const PosixEvent = struct { - state: State, + state: u32, cond: c.pthread_cond_t, mutex: c.pthread_mutex_t, - const State = EventState(u8); + const IS_SET: u32 = 1; pub fn init() PosixEvent { return PosixEvent{ - .state = .Empty, + .state = .0, .cond = c.PTHREAD_COND_INITIALIZER, .mutex = c.PTHREAD_MUTEX_INITIALIZER, }; @@ -234,107 +286,141 @@ const PosixEvent = struct { pub fn deinit(self: *PosixEvent) void { // On dragonfly, the destroy functions return EINVAL if they were initialized statically. const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); + assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) std.os.EINVAL else 0)); - self.* = undefined; + assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); } - pub fn isSet(self: *const PosixEvent) bool { + pub fn isSet(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - return self.state == .Signaled; + return self.state == IS_SET; } - pub fn set(self: *PosixEvent) bool { + pub fn reset(self: *PosixEvent) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const woken = self.state == .Waiting; - self.state = .Signaled; - return woken; + const was_set = self.state == IS_SET; + self.state = 0; + return was_set; } - pub fn reset(self: *PosixEvent) bool { + pub fn set(self: *PosixEvent, auto_reset: bool) bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - const was_set = self.state == .Signaled; - self.state = .Empty; - return was_set; + const had_waiter = self.state > IS_SET; + self.state = if (auto_reset) 0 else IS_SET; + if (had_waiter) { + assert(c.pthread_cond_signal(&self.cond) == 0); + } + return had_waiter; } pub fn wait(self: *PosixEvent, timeout: ?u64) ResetEvent.WaitError!bool { assert(c.pthread_mutex_lock(&self.mutex) == 0); defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - if (self.state == .Signaled) + if (self.state == IS_SET) return false; - var ts: std.os.timespec = undefined; + var ts: os.timespec = undefined; var ts_ptr = &ts; if (timeout) |timeout_ns| { - var tv: std.os.timeval = undefined; + var tv: os.timeval = undefined; assert(c.gettimeofday(&tv, null) == 0); - ts.tv_sec = @intCast(isize, tv.tv_sec + (timeout_ns / time.ns_per_s)); - ts.tv_nsec = @intCast(isize, (tv.tv_usec * time.microsecond) + (timeout_ns % time.ns_per_s)); + ts.tv_sec = tv.tv_sec + @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = (tv.tv_usec * time.microsecond) + @intCast(isize, timeout_ns % time.ns_per_s); } - self.state = .Waiting; - while (self.state == .Waiting) { + var dummy_value: u32 = undefined; + var wait_token = @truncate(u32, @ptrToInt(&dummy_value)); + self.state = wait_token; + + while (self.state == wait_token) { const rc = switch (timeout == null) { true => c.pthread_cond_wait(&self.cond, &self.mutex), else => c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ptr), }; - assert(rc == 0); + // TODO: rc appears to be the positive error code making os.errno() always return 0 on linux + switch (std.math.max(@as(c_int, os.errno(rc)), rc)) { + 0 => {}, + os.ETIMEDOUT => return ResetEvent.WaitError.TimedOut, + os.EINVAL => unreachable, + os.EPERM => unreachable, + else => unreachable, + } } + return true; } }; -const WindowsEvent = struct { - state: State, - - const State = EventState(u32); - - pub fn init() WindowsEvent { - return WindowsEvent{ .state = .Empty }; - } - - pub fn deinit(self: *WindowsEvent) void { - self.* = undefined; - } - - pub fn isSet(self: *const WindowsEvent) bool { - return @atomicLoad(State, &self.state, .Acquire) == .Signaled; - } - - pub fn set(self: *WindowsEvent) bool { - if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) != .Waiting) - return false; - - if (getEventHandle()) |handle| { - const key = @ptrCast(*const c_void, &self.state); - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); +test "std.ResetEvent" { + // TODO + if (builtin.single_threaded) + return error.SkipZigTest; + + var event = ResetEvent.init(); + defer event.deinit(); + + // test event setting + testing.expect(event.isSet() == false); + testing.expect(event.set(false) == false); + testing.expect(event.isSet() == true); + + // test event resetting + testing.expect(event.reset() == true); + testing.expect(event.isSet() == false); + testing.expect(event.reset() == false); + + // test waiting timeout + const delay = 100 * time.millisecond; + var timer = time.Timer.start() catch unreachable; + testing.expectError(ResetEvent.WaitError.TimedOut, event.wait(delay)); + const elapsed = timer.read(); + testing.expect(elapsed >= delay and elapsed < delay * 2); + + // test cross thread signaling + const Context = struct { + event: ResetEvent, + value: u128, + + fn receiver(self: *@This()) void { + // wait for the sender to notify us with updated value + assert(self.value == 0); + assert((self.event.wait(1 * time.second) catch unreachable) == true); + assert(self.value == 1); + + // wait for sender to sleep, then notify it of new value + time.sleep(50 * time.millisecond); + self.value = 2; + assert(self.event.set(false) == true); } - return true; - } - pub fn reset(self: *WindowsEvent) bool { - return @atomicRmw(State, &self.state, .Xchg, .Empty, .Monotonic) == .Signaled; - } + fn sender(self: *@This()) !void { + // wait for the receiver() to start wait()'ing + time.sleep(50 * time.millisecond); - pub fn wait(self: *WindowsEvent, timeout: ?u64) ResetEvent.WaitError!bool { - var state = @atomicLoad(State, &self.state, .Monotonic); - while (true) { - switch (state) { - .Empty => state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse break, - .Waiting => break, - .Signaled => return false, - } + // update value to 1 and notify the receiver() + assert(self.value == 0); + self.value = 1; + assert(self.event.set(true) == true); + + // wait for the receiver to update the value & notify us + assert((try self.event.wait(1 * time.second)) == true); + assert(self.value == 2); } + }; - const timeout_ms = if (timeout @intCast(windows.LARGE_INTEGER, ) - } -}; + _ = event.reset(); + var context = Context{ + .event = event, + .value = 0, + }; + + var receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + try context.sender(); +} \ No newline at end of file From a0955990dc2c8df42879e33b308ca177ba2c771a Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 15:50:08 -0600 Subject: [PATCH 3/6] fix ResetEvent windows bugs --- lib/std/reset_event.zig | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 59399d5e783b..f3194b0a7d9f 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -219,17 +219,23 @@ const WindowsEvent = AtomicEvent(struct { return SpinEvent.Futex.wait(ptr, expected, timeout); }; + // NT uses timeouts in units of 100ns with negative value being relative var timeout_ptr: ?*windows.LARGE_INTEGER = null; var timeout_value: windows.LARGE_INTEGER = undefined; if (timeout) |timeout_ns| { timeout_ptr = &timeout_value; - timeout_value = @intCast(windows.LARGE_INTEGER, @divFloor(timeout_ns, time.millisecond)); + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); } - const key = @ptrCast(*const c_void, ptr); - while (@atomicLoad(u32, ptr, .Acquire) == expected) { + // NtWaitForKeyedEvent doesnt have spurious wake-ups + if (@atomicLoad(u32, ptr, .Acquire) == expected) { + const key = @ptrCast(*const c_void, ptr); const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - assert(rc == 0); + switch (rc) { + 0 => {}, + windows.WAIT_TIMEOUT => return ResetEvent.WaitError.TimedOut, + else => unreachable, + } } } @@ -377,10 +383,13 @@ test "std.ResetEvent" { // test waiting timeout const delay = 100 * time.millisecond; + const error_margin = 50 * time.millisecond; + var timer = time.Timer.start() catch unreachable; testing.expectError(ResetEvent.WaitError.TimedOut, event.wait(delay)); const elapsed = timer.read(); - testing.expect(elapsed >= delay and elapsed < delay * 2); + testing.expect(elapsed >= delay - error_margin); + testing.expect(elapsed <= delay + error_margin); // test cross thread signaling const Context = struct { From ca2d566ec85bee81396f64325844cc760b3cf870 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 23 Nov 2019 16:24:01 -0600 Subject: [PATCH 4/6] replace ThreadParker with ResetEvent + WordLock mutex --- lib/std/mutex.zig | 132 +++++++++++++++++++++------------ lib/std/parker.zig | 180 --------------------------------------------- lib/std/std.zig | 2 +- 3 files changed, 86 insertions(+), 228 deletions(-) delete mode 100644 lib/std/parker.zig diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index e8f83a4a176d..39cfab19ce08 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -1,13 +1,12 @@ const std = @import("std.zig"); const builtin = @import("builtin"); const testing = std.testing; -const SpinLock = std.SpinLock; -const ThreadParker = std.ThreadParker; +const ResetEvent = std.ResetEvent; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. -/// This type supports static initialization and is based off of Golang 1.13 runtime.lock_futex: -/// https://github.com/golang/go/blob/master/src/runtime/lock_futex.go +/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot) +/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs /// When an application is built in single threaded release mode, all the functions are /// no-ops. In single threaded debug mode, there is deadlock detection. pub const Mutex = if (builtin.single_threaded) @@ -39,80 +38,119 @@ pub const Mutex = if (builtin.single_threaded) } else struct { - state: State, // TODO: make this an enum - parker: ThreadParker, + state: usize, - const State = enum(u32) { - Unlocked, - Sleeping, - Locked, - }; + const MUTEX_LOCK: usize = 1 << 0; + const QUEUE_LOCK: usize = 1 << 1; + const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); + const QueueNode = std.atomic.Stack(ResetEvent).Node; /// number of iterations to spin yielding the cpu const SPIN_CPU = 4; - /// number of iterations to perform in the cpu yield loop + /// number of iterations to spin in the cpu yield loop const SPIN_CPU_COUNT = 30; /// number of iterations to spin yielding the thread const SPIN_THREAD = 1; pub fn init() Mutex { - return Mutex{ - .state = .Unlocked, - .parker = ThreadParker.init(), - }; + return Mutex{ .state = 0 }; } pub fn deinit(self: *Mutex) void { - self.parker.deinit(); + self.* = undefined; } pub const Held = struct { mutex: *Mutex, pub fn release(self: Held) void { - switch (@atomicRmw(State, &self.mutex.state, .Xchg, .Unlocked, .Release)) { - .Locked => {}, - .Sleeping => self.mutex.parker.unpark(@ptrCast(*const u32, &self.mutex.state)), - .Unlocked => unreachable, // unlocking an unlocked mutex - else => unreachable, // should never be anything else + // since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK). + // this is because .Sub may be implemented more efficiently than the latter + // (e.g. `lock xadd` vs `cmpxchg` loop on x86) + const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); + if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) { + self.mutex.releaseSlow(state); } } }; pub fn acquire(self: *Mutex) Held { - // Try and speculatively grab the lock. - // If it fails, the state is either Locked or Sleeping - // depending on if theres a thread stuck sleeping below. - var state = @atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire); - if (state == .Unlocked) - return Held{ .mutex = self }; + // fast path close to SpinLock fast path + if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| { + self.acquireSlow(current_state); + } + return Held{ .mutex = self }; + } + fn acquireSlow(self: *Mutex, current_state: usize) void { + var spin: usize = 0; + var state = current_state; while (true) { - // try and acquire the lock using cpu spinning on failure - var spin: usize = 0; - while (spin < SPIN_CPU) : (spin += 1) { - var value = @atomicLoad(State, &self.state, .Monotonic); - while (value == .Unlocked) - value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; - SpinLock.yield(SPIN_CPU_COUNT); + + // try and acquire the lock if unlocked + if ((state & MUTEX_LOCK) == 0) { + state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; + continue; + } + + // spin only if the waiting queue isn't empty and when it hasn't spun too much already + if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) { + if (spin < SPIN_CPU) { + std.SpinLock.yield(SPIN_CPU_COUNT); + } else { + std.os.sched_yield() catch std.time.sleep(0); + } + state = @atomicLoad(usize, &self.state, .Monotonic); + continue; } - // try and acquire the lock using thread rescheduling on failure - spin = 0; - while (spin < SPIN_THREAD) : (spin += 1) { - var value = @atomicLoad(State, &self.state, .Monotonic); - while (value == .Unlocked) - value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; - std.os.sched_yield() catch std.time.sleep(1); + // thread should block, try and add this event to the waiting queue + var node = QueueNode{ + .next = @intToPtr(?*QueueNode, state & QUEUE_MASK), + .data = ResetEvent.init(), + }; + defer node.data.deinit(); + const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + // node is in the queue, wait until a `held.release()` wakes us up. + _ = node.data.wait(null) catch unreachable; + spin = 0; + state = @atomicLoad(usize, &self.state, .Monotonic); + continue; + }; + } + } + + fn releaseSlow(self: *Mutex, current_state: usize) void { + // grab the QUEUE_LOCK in order to signal a waiting queue node's event. + var state = current_state; + while (true) { + if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0) + return; + state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; + } + + while (true) { + // barrier needed to observe incoming state changes + defer @fence(.Acquire); + + // the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node. + // avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention. + if ((state & MUTEX_LOCK) != 0) { + state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return; + continue; } - // failed to acquire the lock, go to sleep until woken up by `Held.release()` - if (@atomicRmw(State, &self.state, .Xchg, .Sleeping, .Acquire) == .Unlocked) - return Held{ .mutex = self }; - state = .Sleeping; - self.parker.park(@ptrCast(*const u32, &self.state), @enumToInt(State.Sleeping)); + // try to pop the top node on the waiting queue stack to wake it up + // while at the same time unsetting the QUEUE_LOCK. + const node = @intToPtr(*QueueNode, state & QUEUE_MASK); + const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + _ = node.data.set(false); + return; + }; } } }; diff --git a/lib/std/parker.zig b/lib/std/parker.zig deleted file mode 100644 index 4ba0100b9ed7..000000000000 --- a/lib/std/parker.zig +++ /dev/null @@ -1,180 +0,0 @@ -const std = @import("std.zig"); -const builtin = @import("builtin"); -const time = std.time; -const testing = std.testing; -const assert = std.debug.assert; -const SpinLock = std.SpinLock; -const linux = std.os.linux; -const windows = std.os.windows; - -pub const ThreadParker = switch (builtin.os) { - .linux => if (builtin.link_libc) PosixParker else LinuxParker, - .windows => WindowsParker, - else => if (builtin.link_libc) PosixParker else SpinParker, -}; - -const SpinParker = struct { - pub fn init() SpinParker { - return SpinParker{}; - } - pub fn deinit(self: *SpinParker) void {} - - pub fn unpark(self: *SpinParker, ptr: *const u32) void {} - - pub fn park(self: *SpinParker, ptr: *const u32, expected: u32) void { - var backoff = SpinLock.Backoff.init(); - while (@atomicLoad(u32, ptr, .Acquire) == expected) - backoff.yield(); - } -}; - -const LinuxParker = struct { - pub fn init() LinuxParker { - return LinuxParker{}; - } - pub fn deinit(self: *LinuxParker) void {} - - pub fn unpark(self: *LinuxParker, ptr: *const u32) void { - const rc = linux.futex_wake(@ptrCast(*const i32, ptr), linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); - assert(linux.getErrno(rc) == 0); - } - - pub fn park(self: *LinuxParker, ptr: *const u32, expected: u32) void { - const value = @intCast(i32, expected); - while (@atomicLoad(u32, ptr, .Acquire) == expected) { - const rc = linux.futex_wait(@ptrCast(*const i32, ptr), linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, value, null); - switch (linux.getErrno(rc)) { - 0, linux.EAGAIN => return, - linux.EINTR => continue, - linux.EINVAL => unreachable, - else => continue, - } - } - } -}; - -const WindowsParker = struct { - waiters: u32, - - pub fn init() WindowsParker { - return WindowsParker{ .waiters = 0 }; - } - pub fn deinit(self: *WindowsParker) void {} - - pub fn unpark(self: *WindowsParker, ptr: *const u32) void { - const key = @ptrCast(*const c_void, ptr); - const handle = getEventHandle() orelse return; - - var waiting = @atomicLoad(u32, &self.waiters, .Monotonic); - while (waiting != 0) { - waiting = @cmpxchgWeak(u32, &self.waiters, waiting, waiting - 1, .Acquire, .Monotonic) orelse { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - return; - }; - } - } - - pub fn park(self: *WindowsParker, ptr: *const u32, expected: u32) void { - var spin = SpinLock.Backoff.init(); - const ev_handle = getEventHandle(); - const key = @ptrCast(*const c_void, ptr); - - while (@atomicLoad(u32, ptr, .Monotonic) == expected) { - if (ev_handle) |handle| { - _ = @atomicRmw(u32, &self.waiters, .Add, 1, .Release); - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == 0); - } else { - spin.yield(); - } - } - } - - var event_handle = std.lazyInit(windows.HANDLE); - - fn getEventHandle() ?windows.HANDLE { - if (event_handle.get()) |handle_ptr| - return handle_ptr.*; - defer event_handle.resolve(); - - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(&event_handle.data, access_mask, null, 0) != 0) - return null; - return event_handle.data; - } -}; - -const PosixParker = struct { - cond: c.pthread_cond_t, - mutex: c.pthread_mutex_t, - - const c = std.c; - - pub fn init() PosixParker { - return PosixParker{ - .cond = c.PTHREAD_COND_INITIALIZER, - .mutex = c.PTHREAD_MUTEX_INITIALIZER, - }; - } - - pub fn deinit(self: *PosixParker) void { - // On dragonfly, the destroy functions return EINVAL if they were initialized statically. - const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == (if (builtin.os == .dragonfly) os.EINVAL else 0)); - const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == (if (builtin.os == .dragonfly) os.EINVAL else 0)); - } - - pub fn unpark(self: *PosixParker, ptr: *const u32) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - assert(c.pthread_cond_signal(&self.cond) == 0); - } - - pub fn park(self: *PosixParker, ptr: *const u32, expected: u32) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - while (@atomicLoad(u32, ptr, .Acquire) == expected) - assert(c.pthread_cond_wait(&self.cond, &self.mutex) == 0); - } -}; - -test "std.ThreadParker" { - if (builtin.single_threaded) - return error.SkipZigTest; - - const Context = struct { - parker: ThreadParker, - data: u32, - - fn receiver(self: *@This()) void { - self.parker.park(&self.data, 0); // receives 1 - assert(@atomicRmw(u32, &self.data, .Xchg, 2, .SeqCst) == 1); // sends 2 - self.parker.unpark(&self.data); // wakes up waiters on 2 - self.parker.park(&self.data, 2); // receives 3 - assert(@atomicRmw(u32, &self.data, .Xchg, 4, .SeqCst) == 3); // sends 4 - self.parker.unpark(&self.data); // wakes up waiters on 4 - } - - fn sender(self: *@This()) void { - assert(@atomicRmw(u32, &self.data, .Xchg, 1, .SeqCst) == 0); // sends 1 - self.parker.unpark(&self.data); // wakes up waiters on 1 - self.parker.park(&self.data, 1); // receives 2 - assert(@atomicRmw(u32, &self.data, .Xchg, 3, .SeqCst) == 2); // sends 3 - self.parker.unpark(&self.data); // wakes up waiters on 3 - self.parker.park(&self.data, 3); // receives 4 - } - }; - - var context = Context{ - .parker = ThreadParker.init(), - .data = 0, - }; - defer context.parker.deinit(); - - var receiver = try std.Thread.spawn(&context, Context.receiver); - defer receiver.wait(); - - context.sender(); -} diff --git a/lib/std/std.zig b/lib/std/std.zig index 83b7ed6e9436..09db48960450 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -16,6 +16,7 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("progress.zig").Progress; +pub const ResetEvent = @import("reset_event.zig").ResetEvent; pub const SegmentedList = @import("segmented_list.zig").SegmentedList; pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; pub const SpinLock = @import("spinlock.zig").SpinLock; @@ -23,7 +24,6 @@ pub const StringHashMap = @import("hash_map.zig").StringHashMap; pub const TailQueue = @import("linked_list.zig").TailQueue; pub const Target = @import("target.zig").Target; pub const Thread = @import("thread.zig").Thread; -pub const ThreadParker = @import("parker.zig").ThreadParker; pub const atomic = @import("atomic.zig"); pub const base64 = @import("base64.zig"); From 056b5a26c960923d7afe9807255a32b85d99cacc Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 24 Nov 2019 20:28:29 -0600 Subject: [PATCH 5/6] ResetEvent: get abstime based on std.time --- lib/std/reset_event.zig | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index f3194b0a7d9f..ffa73941c74e 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -333,12 +333,20 @@ const PosixEvent = struct { return false; var ts: os.timespec = undefined; - var ts_ptr = &ts; if (timeout) |timeout_ns| { - var tv: os.timeval = undefined; - assert(c.gettimeofday(&tv, null) == 0); - ts.tv_sec = tv.tv_sec + @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = (tv.tv_usec * time.microsecond) + @intCast(isize, timeout_ns % time.ns_per_s); + var timeout_abs = timeout_ns; + if (comptime std.Target.current.isDarwin()) { + var tv: os.darwin.timeval = undefined; + assert(os.darwin.gettimeofday(&tv, null) == 0); + timeout_abs += @intCast(u64, tv.tv_sec) * time.second; + timeout_abs += @intCast(u64, tv.tv_usec) * time.microsecond; + } else { + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch unreachable; + timeout_abs += @intCast(u64, ts.tv_sec) * time.second; + timeout_abs += @intCast(u64, ts.tv_nsec); + } + ts.tv_sec = @intCast(@typeOf(ts.tv_sec), @divFloor(timeout_abs, time.second)); + ts.tv_nsec = @intCast(@typeOf(ts.tv_nsec), @mod(timeout_abs, time.second)); } var dummy_value: u32 = undefined; @@ -348,7 +356,7 @@ const PosixEvent = struct { while (self.state == wait_token) { const rc = switch (timeout == null) { true => c.pthread_cond_wait(&self.cond, &self.mutex), - else => c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ptr), + else => c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts), }; // TODO: rc appears to be the positive error code making os.errno() always return 0 on linux switch (std.math.max(@as(c_int, os.errno(rc)), rc)) { From ff445814cbf909db79193ce5815279eb074246fe Mon Sep 17 00:00:00 2001 From: kprotty Date: Tue, 26 Nov 2019 13:04:18 -0600 Subject: [PATCH 6/6] remove wait timeout test cases --- lib/std/reset_event.zig | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index ffa73941c74e..e408c0d0ac83 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -389,16 +389,6 @@ test "std.ResetEvent" { testing.expect(event.isSet() == false); testing.expect(event.reset() == false); - // test waiting timeout - const delay = 100 * time.millisecond; - const error_margin = 50 * time.millisecond; - - var timer = time.Timer.start() catch unreachable; - testing.expectError(ResetEvent.WaitError.TimedOut, event.wait(delay)); - const elapsed = timer.read(); - testing.expect(elapsed >= delay - error_margin); - testing.expect(elapsed <= delay + error_margin); - // test cross thread signaling const Context = struct { event: ResetEvent,