Skip to content

Commit

Permalink
std.Thread.Mutex: change API to lock() and unlock()
Browse files Browse the repository at this point in the history
This is a breaking change. Before, usage looked like this:

```zig
const held = mutex.acquire();
defer held.release();
```

Now it looks like this:

```zig
mutex.lock();
defer mutex.unlock();
```

The `Held` type was an idea to make mutexes slightly safer by making it
more difficult to forget to release an aquired lock. However, this
ultimately caused more problems than it solved, when any data structures
needed to store a held mutex. Simplify everything by reducing the API
down to the primitives: lock() and unlock().

Closes ziglang#8051
Closes ziglang#8246
Closes ziglang#10105
  • Loading branch information
andrewrk authored and nuald committed Nov 13, 2021
1 parent ce68e9f commit 05d2475
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 179 deletions.
18 changes: 9 additions & 9 deletions lib/std/Progress.zig
Expand Up @@ -56,7 +56,7 @@ done: bool = true,
/// Protects the `refresh` function, as well as `node.recently_updated_child`.
/// Without this, callsites would call `Node.end` and then free `Node` memory
/// while it was still being accessed by the `refresh` function.
update_lock: std.Thread.Mutex = .{},
update_mutex: std.Thread.Mutex = .{},

/// Keeps track of how many columns in the terminal have been output, so that
/// we can move the cursor back later.
Expand Down Expand Up @@ -103,14 +103,14 @@ pub const Node = struct {
self.context.maybeRefresh();
if (self.parent) |parent| {
{
const held = self.context.update_lock.acquire();
defer held.release();
self.context.update_mutex.lock();
defer self.context.update_mutex.unlock();
_ = @cmpxchgStrong(?*Node, &parent.recently_updated_child, self, null, .Monotonic, .Monotonic);
}
parent.completeOne();
} else {
const held = self.context.update_lock.acquire();
defer held.release();
self.context.update_mutex.lock();
defer self.context.update_mutex.unlock();
self.context.done = true;
self.context.refreshWithHeldLock();
}
Expand Down Expand Up @@ -170,8 +170,8 @@ pub fn start(self: *Progress, name: []const u8, estimated_total_items: usize) !*
pub fn maybeRefresh(self: *Progress) void {
const now = self.timer.read();
if (now < self.initial_delay_ns) return;
const held = self.update_lock.tryAcquire() orelse return;
defer held.release();
if (!self.update_mutex.tryLock()) return;
defer self.update_mutex.unlock();
// TODO I have observed this to happen sometimes. I think we need to follow Rust's
// lead and guarantee monotonically increasing times in the std lib itself.
if (now < self.prev_refresh_timestamp) return;
Expand All @@ -181,8 +181,8 @@ pub fn maybeRefresh(self: *Progress) void {

/// Updates the terminal and resets `self.next_refresh_timestamp`. Thread-safe.
pub fn refresh(self: *Progress) void {
const held = self.update_lock.tryAcquire() orelse return;
defer held.release();
if (!self.update_mutex.tryLock()) return;
defer self.update_mutex.unlock();

return self.refreshWithHeldLock();
}
Expand Down
12 changes: 6 additions & 6 deletions lib/std/Thread/Condition.zig
Expand Up @@ -145,8 +145,8 @@ pub const AtomicCondition = struct {
var waiter = QueueList.Node{ .data = .{} };

{
const held = cond.queue_mutex.acquire();
defer held.release();
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();

cond.queue_list.prepend(&waiter);
@atomicStore(bool, &cond.pending, true, .SeqCst);
Expand All @@ -162,8 +162,8 @@ pub const AtomicCondition = struct {
return;

const maybe_waiter = blk: {
const held = cond.queue_mutex.acquire();
defer held.release();
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();

const maybe_waiter = cond.queue_list.popFirst();
@atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
Expand All @@ -181,8 +181,8 @@ pub const AtomicCondition = struct {
@atomicStore(bool, &cond.pending, false, .SeqCst);

var waiters = blk: {
const held = cond.queue_mutex.acquire();
defer held.release();
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();

const waiters = cond.queue_list;
cond.queue_list = .{};
Expand Down
106 changes: 32 additions & 74 deletions lib/std/Thread/Mutex.zig
Expand Up @@ -8,13 +8,13 @@
//! Example usage:
//! var m = Mutex{};
//!
//! const lock = m.acquire();
//! defer lock.release();
//! m.lock();
//! defer m.release();
//! ... critical code
//!
//! Non-blocking:
//! if (m.tryAcquire) |lock| {
//! defer lock.release();
//! if (m.tryLock()) {
//! defer m.unlock();
//! // ... critical section
//! } else {
//! // ... lock not acquired
Expand All @@ -32,30 +32,22 @@ const linux = os.linux;
const testing = std.testing;
const StaticResetEvent = std.thread.StaticResetEvent;

/// Try to acquire the mutex without blocking. Returns `null` if the mutex is
/// unavailable. Otherwise returns `Held`. Call `release` on `Held`, or use
/// releaseDirect().
pub fn tryAcquire(m: *Mutex) ?Held {
return m.impl.tryAcquire();
/// Try to acquire the mutex without blocking. Returns `false` if the mutex is
/// unavailable. Otherwise returns `true`. Call `unlock` on the mutex to release.
pub fn tryLock(m: *Mutex) bool {
return m.impl.tryLock();
}

/// Acquire the mutex. Deadlocks if the mutex is already
/// held by the calling thread.
pub fn acquire(m: *Mutex) Held {
return m.impl.acquire();
pub fn lock(m: *Mutex) void {
m.impl.lock();
}

/// Release the mutex. Prefer Held.release() if available.
pub fn releaseDirect(m: *Mutex) void {
return m.impl.releaseDirect();
pub fn unlock(m: *Mutex) void {
m.impl.unlock();
}

/// A held mutex handle. Call release to allow other threads to
/// take the mutex. Do not call release() more than once.
/// For more complex scenarios, this handle can be discarded
/// and Mutex.releaseDirect can be called instead.
pub const Held = Impl.Held;

const Impl = if (builtin.single_threaded)
Dummy
else if (builtin.os.tag == .windows)
Expand All @@ -65,32 +57,6 @@ else if (std.Thread.use_pthreads)
else
AtomicMutex;

fn HeldInterface(comptime MutexType: type) type {
return struct {
const Mixin = @This();
pub const Held = struct {
mutex: *MutexType,

pub fn release(held: Mixin.Held) void {
held.mutex.releaseDirect();
}
};

pub fn tryAcquire(m: *MutexType) ?Mixin.Held {
if (m.tryAcquireDirect()) {
return Mixin.Held{ .mutex = m };
} else {
return null;
}
}

pub fn acquire(m: *MutexType) Mixin.Held {
m.acquireDirect();
return Mixin.Held{ .mutex = m };
}
};
}

pub const AtomicMutex = struct {
state: State = .unlocked,

Expand All @@ -100,9 +66,7 @@ pub const AtomicMutex = struct {
waiting,
};

pub usingnamespace HeldInterface(@This());

fn tryAcquireDirect(m: *AtomicMutex) bool {
pub fn tryLock(m: *AtomicMutex) bool {
return @cmpxchgStrong(
State,
&m.state,
Expand All @@ -113,14 +77,14 @@ pub const AtomicMutex = struct {
) == null;
}

fn acquireDirect(m: *AtomicMutex) void {
pub fn lock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| m.lockSlow(s),
}
}

fn releaseDirect(m: *AtomicMutex) void {
pub fn unlock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
Expand Down Expand Up @@ -202,18 +166,16 @@ pub const AtomicMutex = struct {
pub const PthreadMutex = struct {
pthread_mutex: std.c.pthread_mutex_t = .{},

pub usingnamespace HeldInterface(@This());

/// Try to acquire the mutex without blocking. Returns true if
/// the mutex is unavailable. Otherwise returns false. Call
/// release when done.
fn tryAcquireDirect(m: *PthreadMutex) bool {
pub fn tryLock(m: *PthreadMutex) bool {
return std.c.pthread_mutex_trylock(&m.pthread_mutex) == .SUCCESS;
}

/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
fn acquireDirect(m: *PthreadMutex) void {
pub fn lock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_lock(&m.pthread_mutex)) {
.SUCCESS => {},
.INVAL => unreachable,
Expand All @@ -225,7 +187,7 @@ pub const PthreadMutex = struct {
}
}

fn releaseDirect(m: *PthreadMutex) void {
pub fn unlock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_unlock(&m.pthread_mutex)) {
.SUCCESS => return,
.INVAL => unreachable,
Expand All @@ -239,51 +201,47 @@ pub const PthreadMutex = struct {
/// This has the sematics as `Mutex`, however it does not actually do any
/// synchronization. Operations are safety-checked no-ops.
pub const Dummy = struct {
lock: @TypeOf(lock_init) = lock_init,

pub usingnamespace HeldInterface(@This());
locked: @TypeOf(lock_init) = lock_init,

const lock_init = if (std.debug.runtime_safety) false else {};

/// Try to acquire the mutex without blocking. Returns false if
/// the mutex is unavailable. Otherwise returns true.
fn tryAcquireDirect(m: *Dummy) bool {
pub fn tryLock(m: *Dummy) bool {
if (std.debug.runtime_safety) {
if (m.lock) return false;
m.lock = true;
if (m.locked) return false;
m.locked = true;
}
return true;
}

/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
fn acquireDirect(m: *Dummy) void {
if (!m.tryAcquireDirect()) {
pub fn lock(m: *Dummy) void {
if (!m.tryLock()) {
@panic("deadlock detected");
}
}

fn releaseDirect(m: *Dummy) void {
pub fn unlock(m: *Dummy) void {
if (std.debug.runtime_safety) {
m.lock = false;
m.locked = false;
}
}
};

const WindowsMutex = struct {
pub const WindowsMutex = struct {
srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT,

pub usingnamespace HeldInterface(@This());

fn tryAcquireDirect(m: *WindowsMutex) bool {
pub fn tryLock(m: *WindowsMutex) bool {
return windows.kernel32.TryAcquireSRWLockExclusive(&m.srwlock) != windows.FALSE;
}

fn acquireDirect(m: *WindowsMutex) void {
pub fn lock(m: *WindowsMutex) void {
windows.kernel32.AcquireSRWLockExclusive(&m.srwlock);
}

fn releaseDirect(m: *WindowsMutex) void {
pub fn unlock(m: *WindowsMutex) void {
windows.kernel32.ReleaseSRWLockExclusive(&m.srwlock);
}
};
Expand Down Expand Up @@ -322,8 +280,8 @@ test "basic usage" {
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != TestContext.incr_count) : (i += 1) {
const held = ctx.mutex.acquire();
defer held.release();
ctx.mutex.lock();
defer ctx.mutex.unlock();

ctx.data += 1;
}
Expand Down
8 changes: 4 additions & 4 deletions lib/std/Thread/Semaphore.zig
Expand Up @@ -13,8 +13,8 @@ const Mutex = std.Thread.Mutex;
const Condition = std.Thread.Condition;

pub fn wait(sem: *Semaphore) void {
const held = sem.mutex.acquire();
defer held.release();
sem.mutex.lock();
defer sem.mutex.unlock();

while (sem.permits == 0)
sem.cond.wait(&sem.mutex);
Expand All @@ -25,8 +25,8 @@ pub fn wait(sem: *Semaphore) void {
}

pub fn post(sem: *Semaphore) void {
const held = sem.mutex.acquire();
defer held.release();
sem.mutex.lock();
defer sem.mutex.unlock();

sem.permits += 1;
sem.cond.signal();
Expand Down
24 changes: 12 additions & 12 deletions lib/std/atomic/queue.zig
Expand Up @@ -31,8 +31,8 @@ pub fn Queue(comptime T: type) type {
pub fn put(self: *Self, node: *Node) void {
node.next = null;

const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();

node.prev = self.tail;
self.tail = node;
Expand All @@ -48,8 +48,8 @@ pub fn Queue(comptime T: type) type {
/// It is safe to `get()` a node from the queue while another thread tries
/// to `remove()` the same node at the same time.
pub fn get(self: *Self) ?*Node {
const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();

const head = self.head orelse return null;
self.head = head.next;
Expand All @@ -67,8 +67,8 @@ pub fn Queue(comptime T: type) type {
pub fn unget(self: *Self, node: *Node) void {
node.prev = null;

const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();

const opt_head = self.head;
self.head = node;
Expand All @@ -84,8 +84,8 @@ pub fn Queue(comptime T: type) type {
/// It is safe to `remove()` a node from the queue while another thread tries
/// to `get()` the same node at the same time.
pub fn remove(self: *Self, node: *Node) bool {
const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();

if (node.prev == null and node.next == null and self.head != node) {
return false;
Expand All @@ -110,8 +110,8 @@ pub fn Queue(comptime T: type) type {
/// Note that in a multi-consumer environment a return value of `false`
/// does not mean that `get` will yield a non-`null` value!
pub fn isEmpty(self: *Self) bool {
const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();
return self.head == null;
}

Expand Down Expand Up @@ -144,8 +144,8 @@ pub fn Queue(comptime T: type) type {
}
}
};
const held = self.mutex.acquire();
defer held.release();
self.mutex.lock();
defer self.mutex.unlock();

try stream.print("head: ", .{});
try S.dumpRecursive(stream, self.head, 0, 4);
Expand Down

0 comments on commit 05d2475

Please sign in to comment.