Skip to content

Commit

Permalink
Auto merge of #8 - japaric:atomic, r=<try>
Browse files Browse the repository at this point in the history
use atomics where available

cc #5
cc @pftbest
  • Loading branch information
homunkulus committed Oct 31, 2017
2 parents f7ca3b5 + f9a3dfc commit 4a527af
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 20 deletions.
6 changes: 6 additions & 0 deletions blacklist.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# false positives from thread::spawn (?)
race:<alloc::arc::Arc<T>>::drop_slow
race:__GI___call_tls_dtors
race:alloc::heap::{{impl}}::dealloc
race:core::ptr::drop_in_place<core::option::Option<core::result::Result<(), alloc::boxed::Box<Any>>>>
race:core::ptr::drop_in_place<core::result::Result<(), alloc::boxed::Box<Any>>>
4 changes: 4 additions & 0 deletions ci/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ main() {
rustup component list | grep 'rust-src.*installed' || \
rustup component add rust-src
;;
x86_64-unknown-linux-gnu)
;;
*)
# unhandled case
exit 1
;;
esac
}
Expand Down
15 changes: 14 additions & 1 deletion ci/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,21 @@ main() {
thumb*m-none-eabi)
xargo check --target $TARGET
;;
*)
x86_64-unknown-linux-gnu)
cargo check --target $TARGET

cargo test --target $TARGET
cargo test --target $TARGET --release

export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt"
export RUSTFLAGS="-Z sanitizer=thread"

cargo test --test tsan --target $TARGET
cargo test --test tsan --target $TARGET --release
;;
*)
# unhandled case
exit 1
;;
esac
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@
//! is_send::<Vec<NotSend, [NotSend; 4]>>();
//! ```

#![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))]
#![deny(missing_docs)]
#![feature(cfg_target_has_atomic)]
#![feature(const_fn)]
#![feature(shared)]
#![feature(unsize)]
Expand Down
85 changes: 70 additions & 15 deletions src/ring_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use core::marker::{PhantomData, Unsize};
use core::ptr;
#[cfg(target_has_atomic = "ptr")]
use core::sync::atomic::{AtomicUsize, Ordering};

use untagged_option::UntaggedOption;

Expand All @@ -18,11 +20,16 @@ where
A: Unsize<[T]>,
{
_marker: PhantomData<[T]>,
buffer: UntaggedOption<A>,

// this is from where we dequeue items
head: usize,
#[cfg(target_has_atomic = "ptr")] head: AtomicUsize,
#[cfg(not(target_has_atomic = "ptr"))] head: usize,

// this is where we enqueue new items
tail: usize,
#[cfg(target_has_atomic = "ptr")] tail: AtomicUsize,
#[cfg(not(target_has_atomic = "ptr"))] tail: usize,

buffer: UntaggedOption<A>,
}

impl<T, A> RingBuffer<T, A>
Expand All @@ -35,7 +42,13 @@ where
RingBuffer {
_marker: PhantomData,
buffer: UntaggedOption::none(),
#[cfg(target_has_atomic = "ptr")]
head: AtomicUsize::new(0),
#[cfg(not(target_has_atomic = "ptr"))]
head: 0,
#[cfg(target_has_atomic = "ptr")]
tail: AtomicUsize::new(0),
#[cfg(not(target_has_atomic = "ptr"))]
tail: 0,
}
}
Expand All @@ -49,11 +62,22 @@ where
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> {
let n = self.capacity() + 1;

#[cfg(target_has_atomic = "ptr")]
let head = self.head.get_mut();
#[cfg(not(target_has_atomic = "ptr"))]
let head = &mut self.head;

#[cfg(target_has_atomic = "ptr")]
let tail = self.tail.get_mut();
#[cfg(not(target_has_atomic = "ptr"))]
let tail = &mut self.tail;

let buffer: &[T] = unsafe { self.buffer.as_ref() };

if self.head != self.tail {
let item = unsafe { ptr::read(buffer.get_unchecked(self.head)) };
self.head = (self.head + 1) % n;
if *head != *tail {
let item = unsafe { ptr::read(buffer.get_unchecked(*head)) };
*head = (*head + 1) % n;
Some(item)
} else {
None
Expand All @@ -65,14 +89,25 @@ where
/// Returns `BufferFullError` if the queue is full
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let n = self.capacity() + 1;

#[cfg(target_has_atomic = "ptr")]
let head = self.head.get_mut();
#[cfg(not(target_has_atomic = "ptr"))]
let head = &mut self.head;

#[cfg(target_has_atomic = "ptr")]
let tail = self.tail.get_mut();
#[cfg(not(target_has_atomic = "ptr"))]
let tail = &mut self.tail;

let buffer: &mut [T] = unsafe { self.buffer.as_mut() };

let next_tail = (self.tail + 1) % n;
if next_tail != self.head {
let next_tail = (*tail + 1) % n;
if next_tail != *head {
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(self.tail), item) }
self.tail = next_tail;
unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) }
*tail = next_tail;
Ok(())
} else {
Err(BufferFullError)
Expand All @@ -81,10 +116,20 @@ where

/// Returns the number of elements in the queue
pub fn len(&self) -> usize {
if self.head > self.tail {
self.head - self.tail
#[cfg(target_has_atomic = "ptr")]
let head = self.head.load(Ordering::Relaxed);
#[cfg(not(target_has_atomic = "ptr"))]
let head = self.head;

#[cfg(target_has_atomic = "ptr")]
let tail = self.tail.load(Ordering::Relaxed);
#[cfg(not(target_has_atomic = "ptr"))]
let tail = self.tail;

if head > tail {
head - tail
} else {
self.tail - self.head
tail - head
}
}

Expand Down Expand Up @@ -176,9 +221,14 @@ where

fn next(&mut self) -> Option<&'a T> {
if self.index < self.len {
#[cfg(not(target_has_atomic = "ptr"))]
let head = self.rb.head;
#[cfg(target_has_atomic = "ptr")]
let head = self.rb.head.load(Ordering::Relaxed);

let buffer: &[T] = unsafe { self.rb.buffer.as_ref() };
let ptr = buffer.as_ptr();
let i = (self.rb.head + self.index) % (self.rb.capacity() + 1);
let i = (head + self.index) % (self.rb.capacity() + 1);
self.index += 1;
Some(unsafe { &*ptr.offset(i as isize) })
} else {
Expand All @@ -196,10 +246,15 @@ where

fn next(&mut self) -> Option<&'a mut T> {
if self.index < self.len {
#[cfg(not(target_has_atomic = "ptr"))]
let head = self.rb.head;
#[cfg(target_has_atomic = "ptr")]
let head = self.rb.head.load(Ordering::Relaxed);

let capacity = self.rb.capacity() + 1;
let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() };
let ptr: *mut T = buffer.as_mut_ptr();
let i = (self.rb.head + self.index) % capacity;
let i = (head + self.index) % capacity;
self.index += 1;
Some(unsafe { &mut *ptr.offset(i as isize) })
} else {
Expand Down
62 changes: 58 additions & 4 deletions src/ring_buffer/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use core::ptr::{self, Shared};
use core::marker::Unsize;
#[cfg(target_has_atomic = "ptr")]
use core::sync::atomic::Ordering;

use BufferFullError;
use ring_buffer::RingBuffer;
Expand All @@ -10,8 +12,11 @@ where
{
/// Splits a statically allocated ring buffer into producer and consumer end points
///
/// *Warning* the current implementation only supports single core processors. It's also fine to
/// use both end points on the same core of a multi-core processor.
/// **Warning** the current single producer single consumer implementation only supports
/// multi-core systems where `cfg(target_has_atomic = "ptr")` holds for all the cores. For
/// example, a dual core system where one core is Cortex-M0 core and the other is Cortex-M3 core
/// is not supported because Cortex-M0 (`thumbv6m-none-eabi`) doesn't satisfy
/// `cfg(target_has_atomic = "ptr")`. All single core systems are supported.
pub fn split(&'static mut self) -> (Producer<T, A>, Consumer<T, A>) {
(
Producer {
Expand Down Expand Up @@ -39,8 +44,30 @@ where
A: Unsize<[T]>,
{
/// Returns the item in the front of the queue, or `None` if the queue is empty
#[cfg(target_has_atomic = "ptr")]
pub fn dequeue(&mut self) -> Option<T> {
let rb = unsafe { self.rb.as_ref() };

let tail = rb.tail.load(Ordering::Relaxed);
let head = rb.head.load(Ordering::Acquire);

let n = rb.capacity() + 1;
let buffer: &[T] = unsafe { rb.buffer.as_ref() };

if head != tail {
let item = unsafe { ptr::read(buffer.get_unchecked(head)) };
rb.head.store((head + 1) % n, Ordering::Release);
Some(item)
} else {
None
}
}

/// Returns the item in the front of the queue, or `None` if the queue is empty
#[cfg(not(target_has_atomic = "ptr"))]
pub fn dequeue(&mut self) -> Option<T> {
let rb = unsafe { self.rb.as_mut() };

let n = rb.capacity() + 1;
let buffer: &[T] = unsafe { rb.buffer.as_ref() };

Expand Down Expand Up @@ -80,17 +107,44 @@ where
/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
#[cfg(target_has_atomic = "ptr")]
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let rb = unsafe { self.rb.as_mut() };

let head = rb.head.load(Ordering::Relaxed);
let tail = rb.tail.load(Ordering::Acquire);

let n = rb.capacity() + 1;
let next_tail = (tail + 1) % n;

let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };

if next_tail != head {
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) }
rb.tail.store(next_tail, Ordering::Release);
Ok(())
} else {
Err(BufferFullError)
}
}

/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
#[cfg(not(target_has_atomic = "ptr"))]
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let rb = unsafe { self.rb.as_mut() };

let n = rb.capacity() + 1;
let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };

let next_tail = (rb.tail + 1) % n;
// NOTE(volatile) the value of `head` can change at any time in the execution context of the
// producer so we inform this to the compiler using a volatile load
if next_tail != unsafe { ptr::read_volatile(&rb.head) } {
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
// NOTE(ptr::write) see the other `enqueue` implementation above for details
unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) }
rb.tail = next_tail;
Ok(())
Expand Down
20 changes: 20 additions & 0 deletions tests/tsan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
extern crate heapless;

use std::thread;

use heapless::RingBuffer;

#[test]
fn tsan() {
static mut RB: RingBuffer<i32, [i32; 4]> = RingBuffer::new();

unsafe { RB.split() }.0.enqueue(0).unwrap();

thread::spawn(|| {
unsafe { RB.split() }.0.enqueue(1).unwrap();
});

thread::spawn(|| {
unsafe { RB.split() }.1.dequeue().unwrap();
});
}

0 comments on commit 4a527af

Please sign in to comment.