Skip to content

Commit

Permalink
Merge branch 'kprotty-event_lock'
Browse files Browse the repository at this point in the history
closes #3751
  • Loading branch information
andrewrk committed Nov 27, 2019
2 parents 8ecd6c4 + ff44581 commit 63300a2
Show file tree
Hide file tree
Showing 5 changed files with 520 additions and 228 deletions.
1 change: 1 addition & 0 deletions lib/std/c.zig
Expand Up @@ -220,6 +220,7 @@ pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int;

pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{};
pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int;
pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int;
pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int;
pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int;

Expand Down
132 changes: 85 additions & 47 deletions lib/std/mutex.zig
@@ -1,13 +1,12 @@
const std = @import("std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const SpinLock = std.SpinLock;
const ThreadParker = std.ThreadParker;
const ResetEvent = std.ResetEvent;

/// Lock may be held only once. If the same thread
/// tries to acquire the same mutex twice, it deadlocks.
/// This type supports static initialization and is based off of Golang 1.13 runtime.lock_futex:
/// https://github.com/golang/go/blob/master/src/runtime/lock_futex.go
/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot)
/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
/// When an application is built in single threaded release mode, all the functions are
/// no-ops. In single threaded debug mode, there is deadlock detection.
pub const Mutex = if (builtin.single_threaded)
Expand Down Expand Up @@ -39,80 +38,119 @@ pub const Mutex = if (builtin.single_threaded)
}
else
struct {
state: State, // TODO: make this an enum
parker: ThreadParker,
state: usize,

const State = enum(u32) {
Unlocked,
Sleeping,
Locked,
};
const MUTEX_LOCK: usize = 1 << 0;
const QUEUE_LOCK: usize = 1 << 1;
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
const QueueNode = std.atomic.Stack(ResetEvent).Node;

/// number of iterations to spin yielding the cpu
const SPIN_CPU = 4;

/// number of iterations to perform in the cpu yield loop
/// number of iterations to spin in the cpu yield loop
const SPIN_CPU_COUNT = 30;

/// number of iterations to spin yielding the thread
const SPIN_THREAD = 1;

pub fn init() Mutex {
return Mutex{
.state = .Unlocked,
.parker = ThreadParker.init(),
};
return Mutex{ .state = 0 };
}

pub fn deinit(self: *Mutex) void {
self.parker.deinit();
self.* = undefined;
}

pub const Held = struct {
mutex: *Mutex,

pub fn release(self: Held) void {
switch (@atomicRmw(State, &self.mutex.state, .Xchg, .Unlocked, .Release)) {
.Locked => {},
.Sleeping => self.mutex.parker.unpark(@ptrCast(*const u32, &self.mutex.state)),
.Unlocked => unreachable, // unlocking an unlocked mutex
else => unreachable, // should never be anything else
// since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK).
// this is because .Sub may be implemented more efficiently than the latter
// (e.g. `lock xadd` vs `cmpxchg` loop on x86)
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) {
self.mutex.releaseSlow(state);
}
}
};

pub fn acquire(self: *Mutex) Held {
// Try and speculatively grab the lock.
// If it fails, the state is either Locked or Sleeping
// depending on if theres a thread stuck sleeping below.
var state = @atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire);
if (state == .Unlocked)
return Held{ .mutex = self };
// fast path close to SpinLock fast path
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| {
self.acquireSlow(current_state);
}
return Held{ .mutex = self };
}

fn acquireSlow(self: *Mutex, current_state: usize) void {
var spin: usize = 0;
var state = current_state;
while (true) {
// try and acquire the lock using cpu spinning on failure
var spin: usize = 0;
while (spin < SPIN_CPU) : (spin += 1) {
var value = @atomicLoad(State, &self.state, .Monotonic);
while (value == .Unlocked)
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
SpinLock.yield(SPIN_CPU_COUNT);

// try and acquire the lock if unlocked
if ((state & MUTEX_LOCK) == 0) {
state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
continue;
}

// spin only if the waiting queue isn't empty and when it hasn't spun too much already
if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) {
if (spin < SPIN_CPU) {
std.SpinLock.yield(SPIN_CPU_COUNT);
} else {
std.os.sched_yield() catch std.time.sleep(0);
}
state = @atomicLoad(usize, &self.state, .Monotonic);
continue;
}

// try and acquire the lock using thread rescheduling on failure
spin = 0;
while (spin < SPIN_THREAD) : (spin += 1) {
var value = @atomicLoad(State, &self.state, .Monotonic);
while (value == .Unlocked)
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
std.os.sched_yield() catch std.time.sleep(1);
// thread should block, try and add this event to the waiting queue
var node = QueueNode{
.next = @intToPtr(?*QueueNode, state & QUEUE_MASK),
.data = ResetEvent.init(),
};
defer node.data.deinit();
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
// node is in the queue, wait until a `held.release()` wakes us up.
_ = node.data.wait(null) catch unreachable;
spin = 0;
state = @atomicLoad(usize, &self.state, .Monotonic);
continue;
};
}
}

fn releaseSlow(self: *Mutex, current_state: usize) void {
// grab the QUEUE_LOCK in order to signal a waiting queue node's event.
var state = current_state;
while (true) {
if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0)
return;
state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
}

while (true) {
// barrier needed to observe incoming state changes
defer @fence(.Acquire);

// the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node.
// avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention.
if ((state & MUTEX_LOCK) != 0) {
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return;
continue;
}

// failed to acquire the lock, go to sleep until woken up by `Held.release()`
if (@atomicRmw(State, &self.state, .Xchg, .Sleeping, .Acquire) == .Unlocked)
return Held{ .mutex = self };
state = .Sleeping;
self.parker.park(@ptrCast(*const u32, &self.state), @enumToInt(State.Sleeping));
// try to pop the top node on the waiting queue stack to wake it up
// while at the same time unsetting the QUEUE_LOCK.
const node = @intToPtr(*QueueNode, state & QUEUE_MASK);
const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK);
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
_ = node.data.set(false);
return;
};
}
}
};
Expand Down
180 changes: 0 additions & 180 deletions lib/std/parker.zig

This file was deleted.

0 comments on commit 63300a2

Please sign in to comment.