Skip to content

Commit

Permalink
Add Treiber stacks to conc.
Browse files Browse the repository at this point in the history
  • Loading branch information
ticki committed Jun 7, 2017
1 parent 7db92aa commit 6674468
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
8 changes: 7 additions & 1 deletion conc/src/atomic.rs
Expand Up @@ -127,7 +127,13 @@ impl<T> Atomic<T> {
/// # Safety
///
/// As this accepts a raw pointer, it is necessary to mark it as `unsafe`. To uphold the
/// invariants, ensure that `new` isn't used after this function has been called.
/// invariants, ensure that `new` isn't used after this function has been called, if it
/// succeeds (returns `Ok`).
///
/// # Memory leak
///
/// If it fails (returns `Err`), this function won't drop `new` at any point. The handling of
/// its destructor lies solely on the caller of the function.
pub unsafe fn compare_and_store_raw(&self, old: *const T, new: *mut T, ordering: atomic::Ordering)
-> Result<(), ()> {

Expand Down
53 changes: 41 additions & 12 deletions conc/src/sync/treiber.rs
@@ -1,13 +1,14 @@
use std::sync::atomic;
use Atomic;
use std::{mem, ptr};
use {Atomic, Guard};

pub struct Treiber<T> {
head: Atomic<Node<T>>,
}

struct Node<T> {
data: T,
next: *mut T,
next: *const Node<T>,
}

impl<T> Treiber<T> {
Expand All @@ -17,39 +18,67 @@ impl<T> Treiber<T> {
}
}

pub fn pop(&self, item: T) -> Option<::Guard<T>> {
pub fn pop(&self, item: T) -> Option<Guard<T>> {
// TODO: Use `catch {}` here when it lands.
let mut head = self.head.load(atomic::Ordering::Acquire);
while let Some(node) = head {
// TODO: This should be something that ignores the guard creation when the CAS
// fails, because it's expensive to do and not used anyway. It should be easy
// enough to implement, but I am struggling to come up with a good name for
// the method.
match self.head.compare_and_swap_raw(head, node.next, atomic::Ordering::Release) {
Ok(_) => return node.map(|x| x.data),
match unsafe {
self.head.compare_and_swap_raw(
&*node,
node.next as *mut Node<T>,
atomic::Ordering::Release
)
} {
Ok(_) => return Some(node.map(|x| &x.data)),
Err(new_head) => head = new_head,
}
}

None
}

pub fn push(&self, item: T) {
pub fn push(&self, item: T)
where T: 'static {
// Load the current head.
let mut current = self.head.load(atomic::Ordering::Acquire);
let mut current_ptr: Option<*const Node<T>>;

// TODO: Use `catch {}` here when it lands.
let mut head = Box::new(Node {
// Construct a node, which will be the new head.
let mut node = Box::new(Node {
data: item,
next: self.head.load(atomic::Ordering::Acquire),
// Placeholder; we will replace it with an actual value in the loop.
next: ptr::null(),
});

loop {
// Derive the nullable current pointer from the current head.
current_ptr = current.as_ref().map(Guard::as_raw);
// Construct the next-pointer of the new node from the current head.
node.next = current_ptr.unwrap_or(ptr::null());

// TODO: This should be something that ignores the guard creation when the CAS
// succeeds, because it's expensive to do and not used anyway. It should be easy
// enough to implement, but I am struggling to come up with a good name for the
// method.
match self.head.compare_and_swap_raw(head.next, Some(head), atomic::Ordering::Release) {

// CAS from the read pointer to the new head.
match self.head.compare_and_swap(current_ptr, Some(node), atomic::Ordering::Release) {
// If it succeeds, the item has been pushed.
Ok(_) => break,
Err((new_head, Some(node))) => {
head = node;
head.next = new_head;
// If it fails, we will retry the CAS with updated values.
Err((new_head, Some(node2))) => {
// Update the current head.
current = new_head;
// Put the box we gave back to the variable where it belongs.
node = node2;
},
// This should never be reached as we gave an argument which was unconditionally
// `Some`.
_ => unreachable!(),
}
}
Expand Down

0 comments on commit 6674468

Please sign in to comment.