Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to take/release just producer or consumer #78

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 257 additions & 14 deletions core/src/bbbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ use core::{
result::Result as CoreResult,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
AtomicBool, AtomicU8, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
};

/// Bit to define producer taken.
const BIT_PRODUCER: u8 = 1 << 0;

/// Bit to define consumer taken.
const BIT_CONSUMER: u8 = 1 << 1;

#[derive(Debug)]
/// A backing structure for a BBQueue. Can be used to create either
/// a BBQueue or a split Producer/Consumer pair
Expand Down Expand Up @@ -47,8 +54,10 @@ pub struct BBBuffer<const N: usize> {
/// Is there an active write grant?
write_in_progress: AtomicBool,

/// Have we already split?
already_split: AtomicBool,
/// Whether we have split the producer and/or consumer parts off.
///
/// See the `BIT_PRODUCER` and `BIT_CONSUMER` bits which define what parts have been split off.
split_prod_cons: AtomicU8,
}

unsafe impl<const A: usize> Sync for BBBuffer<A> {}
Expand All @@ -63,7 +72,7 @@ impl<'a, const N: usize> BBBuffer<N> {
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
///
/// ```rust
Expand All @@ -86,7 +95,11 @@ impl<'a, const N: usize> BBBuffer<N> {
/// # }
/// ```
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
if atomic::swap(&self.already_split, true, AcqRel) {
// Set producer/consumer taken bit, error and reset if one was already set
let prev = atomic::fetch_or(&self.split_prod_cons, BIT_PRODUCER | BIT_CONSUMER, AcqRel);

if prev > 0 {
self.split_prod_cons.store(prev, Release);
return Err(Error::AlreadySplit);
}

Expand Down Expand Up @@ -123,14 +136,68 @@ impl<'a, const N: usize> BBBuffer<N> {
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while splitting.
pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
let (producer, consumer) = self.try_split()?;
Ok((FrameProducer { producer }, FrameConsumer { consumer }))
}

/// Attempt to release the Producer and Consumer
/// Attempt to take a `Producer` from the `BBBuffer` to gain access to the
/// buffer. If a producer has already been taken, an error will be returned.
///
/// NOTE: When taking the producer, the underlying buffer will be explicitly initialized
/// to zero. This may take a measurable amount of time, depending on the size
/// of the buffer. This is necessary to prevent undefined behavior. If the buffer
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
pub fn try_take_producer(&'a self) -> Result<Producer<'a, N>> {
// Set producer taken bit, error if already set
if atomic::fetch_or(&self.split_prod_cons, BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 {
return Err(Error::AlreadySplit);
}

unsafe {
// Explicitly zero the data to avoid undefined behavior.
// This is required, because we hand out references to the buffers,
// which mean that creating them as references is technically UB for now
let mu_ptr = self.buf.get();
(*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);

let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);

Ok(Producer {
bbq: nn1,
pd: PhantomData,
})
}
}

/// Attempt to take a `Consumer` from the `BBBuffer` to gain access to the
/// buffer. If a consumer has already been taken, an error will be returned.
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
pub fn try_take_consumer(&'a self) -> Result<Consumer<'a, N>> {
// Set producer taken bit, error if already set
if atomic::fetch_or(&self.split_prod_cons, BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 {
return Err(Error::AlreadySplit);
}

unsafe {
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);

Ok(Consumer {
bbq: nn1,
pd: PhantomData,
})
}
}

/// Attempt to release the `Producer` and `Consumer`
///
/// This re-initializes the buffer so it may be split in a different mode at a later
/// time. There must be no read or write grants active, or an error will be returned.
Expand Down Expand Up @@ -204,7 +271,7 @@ impl<'a, const N: usize> BBBuffer<N> {
self.last.store(0, Release);

// Mark the buffer as ready to be split
self.already_split.store(false, Release);
self.split_prod_cons.store(0, Release);

Ok(())
}
Expand All @@ -227,6 +294,154 @@ impl<'a, const N: usize> BBBuffer<N> {
(FrameProducer { producer }, FrameConsumer { consumer })
})
}

/// Attempt to release the `Producer`.
///
/// This re-initializes the buffer if the consumer was already released so it may be
/// split in a different mode at a later time. There must be no read or write grants
/// active, or an error will be returned.
///
/// The `Producer` ust be from THIS `BBBuffer`, or an error will be returned.
///
/// ```rust
/// # // bbqueue test shim!
/// # fn bbqtest() {
/// use bbqueue::BBBuffer;
///
/// // Create and split a new buffer
/// let buffer: BBBuffer<6> = BBBuffer::new();
/// let (prod, cons) = buffer.try_split().unwrap();
///
/// // Not possible to split twice
/// assert!(buffer.try_split().is_err());
///
/// // Release the producer and consumer
/// assert!(buffer.try_release(prod, cons).is_ok());
///
/// // Split the buffer in framed mode
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
/// # // bbqueue test shim!
/// # }
/// #
/// # fn main() {
/// # #[cfg(not(feature = "thumbv6"))]
/// # bbqtest();
/// # }
/// ```
pub fn try_release_producer(
&'a self,
prod: Producer<'a, N>,
) -> CoreResult<(), Producer<'a, N>> {
// Note: Re-entrancy is not possible because we require ownership
// of the producer, which are not cloneable. We also
// can assume the buffer has been split, because

// Is this our producer?
let our_prod = prod.bbq.as_ptr() as *const Self == self;

if !(our_prod) {
// Can't release, not our producer
return Err(prod);
}

let wr_in_progress = self.write_in_progress.load(Acquire);
let rd_in_progress = self.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
return Err(prod);
}

// Drop the producer
drop(prod);

// Re-initialize the buffer if consumer is already released (not totally needed, but nice to do)
if self.split_prod_cons.load(Acquire) & BIT_CONSUMER == 0 {
self.write.store(0, Release);
self.read.store(0, Release);
self.reserve.store(0, Release);
self.last.store(0, Release);
}

// Mark the buffer as ready to retake producer
atomic::fetch_and(&self.split_prod_cons, !BIT_PRODUCER, Release);

Ok(())
}

/// Attempt to release the `Consumer`.
///
/// This re-initializes the buffer if the producer was already released so it may be
/// split in a different mode at a later time. There must be no read or write grants
/// active, or an error will be returned.
///
/// The `Consumer` must be from THIS `BBBuffer`, or an error will be returned.
///
/// ```rust
/// # // bbqueue test shim!
/// # fn bbqtest() {
/// use bbqueue::BBBuffer;
///
/// // Create and split a new buffer
/// let buffer: BBBuffer<6> = BBBuffer::new();
/// let (prod, cons) = buffer.try_split().unwrap();
///
/// // Not possible to split twice
/// assert!(buffer.try_split().is_err());
///
/// // Release the producer and consumer
/// assert!(buffer.try_release(prod, cons).is_ok());
///
/// // Split the buffer in framed mode
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
/// # // bbqueue test shim!
/// # }
/// #
/// # fn main() {
/// # #[cfg(not(feature = "thumbv6"))]
/// # bbqtest();
/// # }
/// ```
pub fn try_release_consumer(
&'a self,
cons: Consumer<'a, N>,
) -> CoreResult<(), Consumer<'a, N>> {
// Note: Re-entrancy is not possible because we require ownership
// of the consumer, which are not cloneable. We also
// can assume the buffer has been split, because

// Is this our consumer?
let our_cons = cons.bbq.as_ptr() as *const Self == self;

if !(our_cons) {
// Can't release, not our consumer
return Err(cons);
}

let wr_in_progress = self.write_in_progress.load(Acquire);
let rd_in_progress = self.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
return Err(cons);
}

// Drop the consumer
drop(cons);

// Re-initialize the buffer if producer is already released (not totally needed, but nice to do)
if self.split_prod_cons.load(Acquire) & BIT_PRODUCER == 0 {
self.write.store(0, Release);
self.read.store(0, Release);
self.reserve.store(0, Release);
self.last.store(0, Release);
}

// Mark the buffer as ready to retake consumer
atomic::fetch_and(&self.split_prod_cons, !BIT_CONSUMER, Release);

Ok(())
}
}

impl<const A: usize> BBBuffer<A> {
Expand Down Expand Up @@ -280,7 +495,7 @@ impl<const A: usize> BBBuffer<A> {
write_in_progress: AtomicBool::new(false),

// We haven't split at the start
already_split: AtomicBool::new(false),
split_prod_cons: AtomicU8::new(0),
}
}
}
Expand Down Expand Up @@ -744,7 +959,7 @@ impl<'a, const N: usize> GrantW<'a, N> {
/// If `used` is larger than the given grant, the maximum amount will
/// be commited
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while committing.
pub fn commit(mut self, used: usize) {
self.commit_inner(used);
Expand Down Expand Up @@ -861,7 +1076,7 @@ impl<'a, const N: usize> GrantR<'a, N> {
/// If `used` is larger than the given grant, the full grant will
/// be released.
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while releasing.
pub fn release(mut self, used: usize) {
// Saturate the grant release
Expand Down Expand Up @@ -969,7 +1184,7 @@ impl<'a, const N: usize> SplitGrantR<'a, N> {
/// If `used` is larger than the given grant, the full grant will
/// be released.
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while releasing.
pub fn release(mut self, used: usize) {
// Saturate the grant release
Expand Down Expand Up @@ -1105,7 +1320,7 @@ impl<'a, const N: usize> DerefMut for GrantR<'a, N> {
#[cfg(feature = "thumbv6")]
mod atomic {
use core::sync::atomic::{
AtomicBool, AtomicUsize,
AtomicBool, AtomicU8, AtomicUsize,
Ordering::{self, Acquire, Release},
};
use cortex_m::interrupt::free;
Expand All @@ -1128,6 +1343,24 @@ mod atomic {
})
}

#[inline(always)]
pub fn fetch_and(atomic: &AtomicU8, val: u8, _order: Ordering) -> u8 {
free(|_| {
let prev = atomic.load(Acquire);
atomic.store(prev & val, Release);
prev
})
}

#[inline(always)]
pub fn fetch_or(atomic: &AtomicU8, val: u8, _order: Ordering) -> u8 {
free(|_| {
let prev = atomic.load(Acquire);
atomic.store(prev | val, Release);
prev
})
}

#[inline(always)]
pub fn swap(atomic: &AtomicBool, val: bool, _order: Ordering) -> bool {
free(|_| {
Expand All @@ -1140,7 +1373,7 @@ mod atomic {

#[cfg(not(feature = "thumbv6"))]
mod atomic {
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};

#[inline(always)]
pub fn fetch_add(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize {
Expand All @@ -1152,6 +1385,16 @@ mod atomic {
atomic.fetch_sub(val, order)
}

#[inline(always)]
pub fn fetch_and(atomic: &AtomicU8, val: u8, order: Ordering) -> u8 {
atomic.fetch_and(val, order)
}

#[inline(always)]
pub fn fetch_or(atomic: &AtomicU8, val: u8, order: Ordering) -> u8 {
atomic.fetch_or(val, order)
}

#[inline(always)]
pub fn swap(atomic: &AtomicBool, val: bool, order: Ordering) -> bool {
atomic.swap(val, order)
Expand Down
Loading