From b7410357a7505b9dd813164e091ff27ae753c30c Mon Sep 17 00:00:00 2001 From: geo-ant <54497890+geo-ant@users.noreply.github.com> Date: Fri, 5 Nov 2021 09:43:57 +0100 Subject: [PATCH 1/3] fix compilation error with AtomicCondition and add a test case --- lib/std/Thread/Condition.zig | 65 +++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index 658817fc08c3..4a69aeb7c4c8 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -106,7 +106,7 @@ pub const AtomicCondition = struct { .linux => { switch (linux.getErrno(linux.futex_wait( &cond.futex, - linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, 0, null, ))) { @@ -128,7 +128,7 @@ pub const AtomicCondition = struct { .linux => { switch (linux.getErrno(linux.futex_wake( &cond.futex, - linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, 1, ))) { .SUCCESS => {}, @@ -152,9 +152,9 @@ pub const AtomicCondition = struct { @atomicStore(bool, &cond.pending, true, .SeqCst); } - mutex.unlock(); + mutex.releaseDirect(); waiter.data.wait(); - mutex.lock(); + _ = mutex.acquire(); } pub fn signal(cond: *AtomicCondition) void { @@ -193,3 +193,60 @@ pub const AtomicCondition = struct { waiter.data.notify(); } }; + +// verify that the condition variable unblocks when signalled +test "AtomicCondition" { + var wait_thread_alive = std.atomic.Atomic(bool).init(false); + var wait_thread_finished = std.atomic.Atomic(bool).init(false); + var run_condition = std.atomic.Atomic(bool).init(false); + var condvar = Condition{}; + const test_thread = try std.Thread.spawn(.{}, conditionWaitThread, .{ &wait_thread_alive, &wait_thread_finished, &run_condition, &condvar }); + test_thread.detach(); + + // we give the waiting thread generous time to become alive, but + // in case it does not, we fail here rather than hang indefinitely + try waitUntilTrue(&wait_thread_alive,10); + + // this does not really tell us much, but we might as well check it + try std.testing.expect(!wait_thread_finished.load(.SeqCst)); + + run_condition.store(true, .SeqCst); + condvar.signal(); + + // similar as above we let the thread indicate it is finished or fail + try waitUntilTrue(&wait_thread_finished,10); +} + +/// a primitive helper method that blocks until an atomic boolean becomes true +/// if the bool does not become true within the given amount of max seconds, this function failes +fn waitUntilTrue(boolean: *std.atomic.Atomic(bool), max_wait_time_seconds: u32) !void { + var current_time : std.os.timespec = undefined; + try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); + const start_time = current_time; + while (!boolean.load(.SeqCst)) { + std.os.nanosleep(1, 250 * 1000); + try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); + if(current_time.tv_sec - start_time.tv_sec > max_wait_time_seconds) { + return error.Timeout; + } + } +} + +/// a helper thread for testing the condition variable. Both wait_... variables must be false when passed to the thread. +/// when the thread starts it +fn conditionWaitThread(wait_thread_alive: *std.atomic.Atomic(bool), wait_thread_finished: *std.atomic.Atomic(bool), run_condition: *std.atomic.Atomic(bool), condvar: *Condition) void { + std.debug.assert(!wait_thread_alive.load(.SeqCst)); + std.debug.assert(!wait_thread_finished.load(.SeqCst)); + + // indicate this thread has started up + wait_thread_alive.store(true, .SeqCst); + var mutex = std.Thread.Mutex{}; + const held = mutex.acquire(); + defer held.release(); + // wait for the condition variable + while (!run_condition.load(.SeqCst)) { + condvar.wait(&mutex); + } + // and indicate that this thread has completed + wait_thread_finished.store(true, .SeqCst); +} \ No newline at end of file From 8c9249164a4bff9f62b8fc27e4be37247e912445 Mon Sep 17 00:00:00 2001 From: geo-ant <54497890+geo-ant@users.noreply.github.com> Date: Fri, 5 Nov 2021 12:52:25 +0100 Subject: [PATCH 2/3] make test AtomicCondition test execute only multithreaded build --- lib/std/Thread/Condition.zig | 46 +++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index 4a69aeb7c4c8..5c342082ab0c 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -196,37 +196,39 @@ pub const AtomicCondition = struct { // verify that the condition variable unblocks when signalled test "AtomicCondition" { - var wait_thread_alive = std.atomic.Atomic(bool).init(false); - var wait_thread_finished = std.atomic.Atomic(bool).init(false); - var run_condition = std.atomic.Atomic(bool).init(false); - var condvar = Condition{}; - const test_thread = try std.Thread.spawn(.{}, conditionWaitThread, .{ &wait_thread_alive, &wait_thread_finished, &run_condition, &condvar }); - test_thread.detach(); - - // we give the waiting thread generous time to become alive, but - // in case it does not, we fail here rather than hang indefinitely - try waitUntilTrue(&wait_thread_alive,10); - - // this does not really tell us much, but we might as well check it - try std.testing.expect(!wait_thread_finished.load(.SeqCst)); - - run_condition.store(true, .SeqCst); - condvar.signal(); - - // similar as above we let the thread indicate it is finished or fail - try waitUntilTrue(&wait_thread_finished,10); + if (!builtin.single_threaded) { + var wait_thread_alive = std.atomic.Atomic(bool).init(false); + var wait_thread_finished = std.atomic.Atomic(bool).init(false); + var run_condition = std.atomic.Atomic(bool).init(false); + var condvar = Condition{}; + const test_thread = try std.Thread.spawn(.{}, conditionWaitThread, .{ &wait_thread_alive, &wait_thread_finished, &run_condition, &condvar }); + test_thread.detach(); + + // we give the waiting thread generous time to become alive, but + // in case it does not, we fail here rather than hang indefinitely + try waitUntilTrue(&wait_thread_alive, 10); + + // this does not really tell us much, but we might as well check it + try std.testing.expect(!wait_thread_finished.load(.SeqCst)); + + run_condition.store(true, .SeqCst); + condvar.signal(); + + // similar as above we let the thread indicate it is finished or fail + try waitUntilTrue(&wait_thread_finished, 10); + } } /// a primitive helper method that blocks until an atomic boolean becomes true /// if the bool does not become true within the given amount of max seconds, this function failes fn waitUntilTrue(boolean: *std.atomic.Atomic(bool), max_wait_time_seconds: u32) !void { - var current_time : std.os.timespec = undefined; + var current_time: std.os.timespec = undefined; try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); const start_time = current_time; while (!boolean.load(.SeqCst)) { std.os.nanosleep(1, 250 * 1000); try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); - if(current_time.tv_sec - start_time.tv_sec > max_wait_time_seconds) { + if (current_time.tv_sec - start_time.tv_sec > max_wait_time_seconds) { return error.Timeout; } } @@ -249,4 +251,4 @@ fn conditionWaitThread(wait_thread_alive: *std.atomic.Atomic(bool), wait_thread_ } // and indicate that this thread has completed wait_thread_finished.store(true, .SeqCst); -} \ No newline at end of file +} From 92dc5730428e75e7f9f27fec2fc6f065ea14de85 Mon Sep 17 00:00:00 2001 From: geo-ant <54497890+geo-ant@users.noreply.github.com> Date: Tue, 9 Nov 2021 20:43:00 +0100 Subject: [PATCH 3/3] replace low-tech test by more sophisticated tests of kprotty --- lib/std/Thread/Condition.zig | 138 ++++++++++++++++++++++------------- lib/std/os.zig | 60 ++++++++++++++- 2 files changed, 146 insertions(+), 52 deletions(-) diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index 5c342082ab0c..b909ef6aec62 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -194,61 +194,97 @@ pub const AtomicCondition = struct { } }; -// verify that the condition variable unblocks when signalled -test "AtomicCondition" { - if (!builtin.single_threaded) { - var wait_thread_alive = std.atomic.Atomic(bool).init(false); - var wait_thread_finished = std.atomic.Atomic(bool).init(false); - var run_condition = std.atomic.Atomic(bool).init(false); - var condvar = Condition{}; - const test_thread = try std.Thread.spawn(.{}, conditionWaitThread, .{ &wait_thread_alive, &wait_thread_finished, &run_condition, &condvar }); - test_thread.detach(); - - // we give the waiting thread generous time to become alive, but - // in case it does not, we fail here rather than hang indefinitely - try waitUntilTrue(&wait_thread_alive, 10); - - // this does not really tell us much, but we might as well check it - try std.testing.expect(!wait_thread_finished.load(.SeqCst)); - - run_condition.store(true, .SeqCst); - condvar.signal(); - - // similar as above we let the thread indicate it is finished or fail - try waitUntilTrue(&wait_thread_finished, 10); - } -} +const testing = std.testing; + +test "Condition - wait/signal" { + if (builtin.single_threaded) return error.SkipZigTest; -/// a primitive helper method that blocks until an atomic boolean becomes true -/// if the bool does not become true within the given amount of max seconds, this function failes -fn waitUntilTrue(boolean: *std.atomic.Atomic(bool), max_wait_time_seconds: u32) !void { - var current_time: std.os.timespec = undefined; - try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); - const start_time = current_time; - while (!boolean.load(.SeqCst)) { - std.os.nanosleep(1, 250 * 1000); - try std.os.clock_gettime(std.os.CLOCK.REALTIME, ¤t_time); - if (current_time.tv_sec - start_time.tv_sec > max_wait_time_seconds) { - return error.Timeout; + const Context = struct { + lock: Mutex = .{}, + cond: Condition = .{}, + signaled: bool = false, + + fn doWait(self: *@This()) void { + const held = self.lock.acquire(); + defer held.release(); + + while (!self.signaled) { + self.cond.wait(&self.lock); + } } + + fn doSignal(self: *@This(), do_broadcast: bool) void { + const held = self.lock.acquire(); + defer held.release(); + + self.signaled = true; + switch (do_broadcast) { + true => self.cond.signal(), + else => self.cond.broadcast(), + } + } + }; + + for ([_]bool{ false, true }) |do_broadcast| { + var context = Context{}; + const wait_signal = try std.Thread.spawn(.{}, Context.doWait, .{&context}); + + context.doSignal(do_broadcast); + wait_signal.join(); } } -/// a helper thread for testing the condition variable. Both wait_... variables must be false when passed to the thread. -/// when the thread starts it -fn conditionWaitThread(wait_thread_alive: *std.atomic.Atomic(bool), wait_thread_finished: *std.atomic.Atomic(bool), run_condition: *std.atomic.Atomic(bool), condvar: *Condition) void { - std.debug.assert(!wait_thread_alive.load(.SeqCst)); - std.debug.assert(!wait_thread_finished.load(.SeqCst)); - - // indicate this thread has started up - wait_thread_alive.store(true, .SeqCst); - var mutex = std.Thread.Mutex{}; - const held = mutex.acquire(); - defer held.release(); - // wait for the condition variable - while (!run_condition.load(.SeqCst)) { - condvar.wait(&mutex); +test "Condition - producer / consumer" { + if (builtin.single_threaded) return error.SkipZigTest; + + const num_threads = 4; + const Context = struct { + lock: Mutex = .{}, + send: Condition = .{}, + recv: Condition = .{}, + value: usize = 0, + + fn doSend(self: *@This(), do_broadcast: bool) void { + const held = self.lock.acquire(); + defer held.release(); + + assert(self.value == 0); + self.value = 1; + switch (do_broadcast) { + true => self.recv.broadcast(), + else => self.recv.signal(), + } + + while (self.value != 0) { + self.send.wait(&self.lock); + } + } + + fn doRecv(self: *@This(), do_broadcast: bool) void { + const held = self.lock.acquire(); + defer held.release(); + + while (self.value == 0) { + self.recv.wait(&self.lock); + } + + self.value -= 1; + switch (do_broadcast) { + true => self.send.broadcast(), + else => self.send.signal(), + } + } + }; + + for ([_]bool{ true, false }) |do_broadcast| { + var context = Context{}; + var threads: [num_threads]std.Thread = undefined; + for (threads) |*t| t.* = try std.Thread.spawn(.{}, Context.doRecv, .{ &context, do_broadcast }); + defer for (threads) |t| t.join(); + + var i: usize = num_threads; + while (i > 0) : (i -= 1) { + context.doSend(do_broadcast); + } } - // and indicate that this thread has completed - wait_thread_finished.store(true, .SeqCst); } diff --git a/lib/std/os.zig b/lib/std/os.zig index f167e47e2a9b..ab8f1aebdd9f 100644 --- a/lib/std/os.zig +++ b/lib/std/os.zig @@ -209,6 +209,25 @@ pub const LOG = struct { pub const socket_t = if (builtin.os.tag == .windows) windows.ws2_32.SOCKET else fd_t; +/// a clock ID that identifies that clock to use for querying system time +pub const ClockId = if (builtin.os.tag == .windows) ClockIdWindows else ClockIdPosix; + +/// clock IDs for linux and the web assembly system interface +const ClockIdPosix = enum(@TypeOf(std.os.CLOCK.REALTIME)) { + /// A realtime clock representing the machine's best estimate of wall-clock time + /// this is possibly not a monotinic clock + Realtime = std.os.CLOCK.REALTIME, + /// A monotiic clock + Monotonic = std.os.CLOCK.MONOTONIC, +}; + +/// clock IDs on the windows platform +const ClockIdWindows = enum(@typeInfo(ClockIdPosix).Enum.tag_type) { + /// A realtime clock representing the machine's best estimate of wall-clock time + /// this is possibly not a monotinic clock + Realtime, +}; + /// See also `getenv`. Populated by startup code before main(). /// TODO this is a footgun because the value will be undefined when using `zig build-lib`. /// https://github.com/ziglang/zig/issues/4524 @@ -4804,8 +4823,47 @@ pub fn dl_iterate_phdr( pub const ClockGetTimeError = error{UnsupportedClock} || UnexpectedError; +pub fn clock_gettime2(clock_id: ClockId) ClockGetTimeError!timespec { + if (builtin.os.tag == .wasi and !builtin.link_libc) { + var ts: timespec = undefined; + switch (system.clock_time_get(@bitCast(u32, clock_id), 1, &ts)) { + .SUCCESS => { + return .{ + .tv_sec = @intCast(i64, ts / std.time.ns_per_s), + .tv_nsec = @intCast(isize, ts % std.time.ns_per_s), + }; + }, + .INVAL => return error.UnsupportedClock, + else => |err| return unexpectedErrno(err), + } + return; + } else if (builtin.os.tag == .windows) {} else if (builtin.os.tag == .windows) { + switch (clock_id) { + .Realtime => { + var ft: windows.FILETIME = undefined; + windows.kernel32.GetSystemTimeAsFileTime(&ft); + // FileTime has a granularity of 100 nanoseconds and uses the NTFS/Windows epoch. + const ft64 = (@as(u64, ft.dwHighDateTime) << 32) | ft.dwLowDateTime; + const ft_per_s = std.time.ns_per_s / 100; + return .{ + .tv_sec = @intCast(i64, ft64 / ft_per_s) + std.time.epoch.windows, + .tv_nsec = @intCast(c_long, ft64 % ft_per_s) * 100, + }; + }, + } + } else { + var tp : timespec = undefined; + switch (errno(system.clock_gettime(clock_id, &tp))) { + .SUCCESS => tp, + .FAULT => unreachable, + .INVAL => return error.UnsupportedClock, + else => |err| return unexpectedErrno(err), + } + } +} + /// TODO: change this to return the timespec as a return value -/// TODO: look into making clk_id an enum +/// TODO: look into making clk_id an e^um pub fn clock_gettime(clk_id: i32, tp: *timespec) ClockGetTimeError!void { if (builtin.os.tag == .wasi and !builtin.link_libc) { var ts: timestamp_t = undefined;