From 05fed2a1e88cba0f63169c913aa5cc9f4993d4fa Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 28 Aug 2022 08:03:56 -0700 Subject: [PATCH] Add a OnceCell type (#27) --- Cargo.toml | 2 +- src/lib.rs | 2 + src/once_cell.rs | 805 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 808 insertions(+), 1 deletion(-) create mode 100644 src/once_cell.rs diff --git a/Cargo.toml b/Cargo.toml index 7658dc8..485fb9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,12 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] +futures-lite = "1.11.0" event-listener = "2.5.1" [dev-dependencies] async-channel = "1.5.0" fastrand = "1.4.0" -futures-lite = "1.11.0" [target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 2159bb1..78a1ff5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,10 +11,12 @@ mod barrier; mod mutex; +mod once_cell; mod rwlock; mod semaphore; pub use barrier::{Barrier, BarrierWaitResult}; pub use mutex::{Mutex, MutexGuard, MutexGuardArc}; +pub use once_cell::OnceCell; pub use rwlock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; pub use semaphore::{Semaphore, SemaphoreGuard, SemaphoreGuardArc}; diff --git a/src/once_cell.rs b/src/once_cell.rs new file mode 100644 index 0000000..079b9b4 --- /dev/null +++ b/src/once_cell.rs @@ -0,0 +1,805 @@ +use std::cell::UnsafeCell; +use std::convert::Infallible; +use std::fmt; +use std::future::Future; +use std::mem::{forget, MaybeUninit}; +use std::pin::Pin; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use event_listener::{Event, EventListener}; +use futures_lite::future; + +/// The current state of the `OnceCell`. +#[derive(Copy, Clone, PartialEq, Eq)] +#[repr(usize)] +enum State { + /// The `OnceCell` is uninitialized. + Uninitialized = 0, + /// The `OnceCell` is being initialized. + Initializing = 1, + /// The `OnceCell` is initialized. + Initialized = 2, +} + +impl From for State { + fn from(val: usize) -> Self { + match val { + 0 => State::Uninitialized, + 1 => State::Initializing, + 2 => State::Initialized, + _ => unreachable!("Invalid state"), + } + } +} + +impl From for usize { + fn from(val: State) -> Self { + val as usize + } +} + +/// A memory location that can be written to at most once. +/// +/// A `OnceCell` can be used to store a single value, and only once. However, +/// once the value is stored, it can be accessed directly through a reference +/// instead of needing an RAII guard like `Mutex` or `RwLock`. +/// +/// # Examples +/// +/// This structure is useful for a variety of patterns, most notably for one-time +/// initialization. +/// +/// ```rust +/// use async_lock::OnceCell; +/// +/// # struct Foobar; +/// +/// async fn very_expensive_initialization() -> Foobar { +/// // Imagine this is very expensive to initialize, +/// // for instance, it requires a network request or +/// // a database call. +/// # Foobar +/// } +/// +/// struct LazyFoobar { +/// inner: OnceCell, +/// } +/// +/// impl LazyFoobar { +/// fn new() -> Self { +/// Self { +/// inner: OnceCell::new(), +/// } +/// } +/// +/// async fn load(&self) -> &Foobar { +/// self.inner.get_or_init(|| async { +/// very_expensive_initialization().await +/// }).await +/// } +/// } +/// ``` +pub struct OnceCell { + /// Listeners waiting for a chance to initialize the cell. + /// + /// These are the users of get_or_init() and similar functions. + active_initializers: Event, + /// Listeners waiting for the cell to be initialized. + /// + /// These are the users of wait(). + passive_waiters: Event, + /// State associated with the cell. + state: AtomicUsize, + /// The value of the cell. + value: UnsafeCell>, +} + +unsafe impl Send for OnceCell {} +unsafe impl Sync for OnceCell {} + +impl OnceCell { + /// Create a new, uninitialized `OnceCell`. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// let cell = OnceCell::new(); + /// # cell.set_blocking(1); + /// ``` + pub const fn new() -> Self { + Self { + active_initializers: Event::new(), + passive_waiters: Event::new(), + state: AtomicUsize::new(State::Uninitialized as _), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Tell whether or not the cell is initialized. + /// + /// This may not always be accurate. For instance, it is possible for + /// another thread to initialize the cell between the time when this + /// function is called and the time when the result is actually used. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// assert!(!cell.is_initialized()); + /// cell.set(1).await; + /// assert!(cell.is_initialized()); + /// # }); + /// ``` + pub fn is_initialized(&self) -> bool { + State::from(self.state.load(Ordering::Acquire)) == State::Initialized + } + + /// Get a reference to the inner value, or `None` if the value + /// is not yet initialized. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// assert!(cell.get().is_none()); + /// cell.set(1).await; + /// assert_eq!(cell.get(), Some(&1)); + /// # }); + /// ``` + pub fn get(&self) -> Option<&T> { + if self.is_initialized() { + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + Some(unsafe { self.get_unchecked() }) + } else { + None + } + } + + /// Get a mutable reference to the inner value, or `None` if the value + /// is not yet initialized. + /// + /// This function is useful for initializing the value inside the cell + /// when we still have a mutable reference to the cell. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let mut cell = OnceCell::new(); + /// assert!(cell.get_mut().is_none()); + /// cell.set(1).await; + /// assert_eq!(cell.get_mut(), Some(&mut 1)); + /// *cell.get_mut().unwrap() = 2; + /// assert_eq!(cell.get(), Some(&2)); + /// # }); + /// ``` + pub fn get_mut(&mut self) -> Option<&mut T> { + if State::from(*self.state.get_mut()) == State::Initialized { + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + Some(unsafe { &mut *self.value.get().cast() }) + } else { + None + } + } + + /// Take the value out of this `OnceCell`, moving it back to the uninitialized + /// state. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let mut cell = OnceCell::new(); + /// cell.set(1).await; + /// assert_eq!(cell.take(), Some(1)); + /// assert!(!cell.is_initialized()); + /// # }); + /// ``` + pub fn take(&mut self) -> Option { + if State::from(*self.state.get_mut()) == State::Initialized { + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + let value = unsafe { ptr::read(self.value.get().cast()) }; + *self.state.get_mut() = State::Uninitialized.into(); + Some(value) + } else { + None + } + } + + /// Convert this `OnceCell` into the inner value, if it is initialized. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// cell.set(1).await; + /// assert_eq!(cell.into_inner(), Some(1)); + /// # }); + /// ``` + pub fn into_inner(mut self) -> Option { + self.take() + } + + /// Wait for the cell to be initialized, and then return a reference to the + /// inner value. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// use std::sync::Arc; + /// use std::time::Duration; + /// use std::thread::{sleep, spawn}; + /// + /// let cell = Arc::new(OnceCell::new()); + /// let cell2 = cell.clone(); + /// + /// spawn(move || { + /// sleep(Duration::from_millis(5)); + /// cell2.set_blocking(1); + /// }); + /// + /// # futures_lite::future::block_on(async { + /// assert_eq!(cell.wait().await, &1); + /// # }); + /// ``` + pub async fn wait(&self) -> &T { + // Fast path: see if the value is already initialized. + if let Some(value) = self.get() { + return value; + } + + // Slow path: wait for the value to be initialized. + let listener = self.passive_waiters.listen(); + + // Try again. + if let Some(value) = self.get() { + return value; + } + + listener.await; + debug_assert!(self.is_initialized()); + + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + unsafe { self.get_unchecked() } + } + + /// Wait for the cell to be initialized, and then return a reference to the + /// inner value. + /// + /// # Blocking + /// + /// In contrast to the `wait` method, this method blocks the current thread of + /// execution instead of awaiting. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a `OnceCell` can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// use std::sync::Arc; + /// use std::time::Duration; + /// use std::thread::{sleep, spawn}; + /// + /// let cell = Arc::new(OnceCell::new()); + /// let cell2 = cell.clone(); + /// + /// spawn(move || { + /// sleep(Duration::from_millis(5)); + /// cell2.set_blocking(1); + /// }); + /// + /// assert_eq!(cell.wait_blocking(), &1); + /// ``` + pub fn wait_blocking(&self) -> &T { + // Fast path: see if the value is already initialized. + if let Some(value) = self.get() { + return value; + } + + // Slow path: wait for the value to be initialized. + let listener = self.passive_waiters.listen(); + + // Try again. + if let Some(value) = self.get() { + return value; + } + + listener.wait(); + debug_assert!(self.is_initialized()); + + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + unsafe { self.get_unchecked() } + } + + /// Either get the value or initialize it with the given closure. + /// + /// The cell will not be initialized if the closure returns an error. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// # + /// # // Prevent explicit value errors. + /// # fn _explicit(_: &Result<&i32, ()>) {} + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// + /// let result = cell.get_or_try_init(|| async { Err(()) }).await; + /// assert!(result.is_err()); + /// + /// let result = cell.get_or_try_init(|| async { Ok(1) }).await; + /// # _explicit(&result); + /// assert_eq!(result.unwrap(), &1); + /// + /// let result = cell.get_or_try_init(|| async { Err(()) }).await; + /// + /// assert_eq!(result.unwrap(), &1); + /// # }); + /// ``` + pub async fn get_or_try_init>>( + &self, + closure: impl FnOnce() -> Fut, + ) -> Result<&T, E> { + // Fast path: see if the value is already initialized. + if let Some(value) = self.get() { + return Ok(value); + } + + // Slow path: initialize the value. + self.initialize_or_wait(closure, &mut NonBlocking).await?; + debug_assert!(self.is_initialized()); + + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + Ok(unsafe { self.get_unchecked() }) + } + + /// Either get the value or initialize it with the given closure. + /// + /// The cell will not be initialized if the closure returns an error. + /// + /// # Blocking + /// + /// In contrast to the `get_or_try_init` method, this method blocks the current thread of + /// execution instead of awaiting. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a `OnceCell` can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// # + /// # // Prevent explicit type errors. + /// # fn _explicit(_: &Result<&i32, ()>) {} + /// + /// let cell = OnceCell::new(); + /// + /// let result = cell.get_or_try_init_blocking(|| Err(())); + /// assert!(result.is_err()); + /// + /// let result = cell.get_or_try_init_blocking(|| Ok(1)); + /// # _explicit(&result); + /// assert_eq!(result.unwrap(), &1); + /// + /// let result = cell.get_or_try_init_blocking(|| Err(())); + /// + /// assert_eq!(result.unwrap(), &1); + /// ``` + pub fn get_or_try_init_blocking( + &self, + closure: impl FnOnce() -> Result, + ) -> Result<&T, E> { + // Fast path: see if the value is already initialized. + if let Some(value) = self.get() { + return Ok(value); + } + + // Slow path: initialize the value. + // The futures provided should never block, so we can use `now_or_never`. + now_or_never(self.initialize_or_wait(move || future::ready(closure()), &mut Blocking))?; + debug_assert!(self.is_initialized()); + + // SAFETY: We know that the value is initialized, so it is safe to + // read it. + Ok(unsafe { self.get_unchecked() }) + } + + /// Either get the value or initialize it with the given closure. + /// + /// Many tasks may call this function, but the value will only be set once + /// and only one closure will be invoked. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// assert_eq!(cell.get_or_init(|| async { 1 }).await, &1); + /// assert_eq!(cell.get_or_init(|| async { 2 }).await, &1); + /// # }); + /// ``` + pub async fn get_or_init>(&self, closure: impl FnOnce() -> Fut) -> &T { + match self + .get_or_try_init(move || async move { + let result: Result = Ok(closure().await); + result + }) + .await + { + Ok(value) => value, + Err(infallible) => match infallible {}, + } + } + + /// Either get the value or initialize it with the given closure. + /// + /// Many tasks may call this function, but the value will only be set once + /// and only one closure will be invoked. + /// + /// # Blocking + /// + /// In contrast to the `get_or_init` method, this method blocks the current thread of + /// execution instead of awaiting. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a `OnceCell` can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// let cell = OnceCell::new(); + /// assert_eq!(cell.get_or_init_blocking(|| 1), &1); + /// assert_eq!(cell.get_or_init_blocking(|| 2), &1); + /// ``` + pub fn get_or_init_blocking(&self, closure: impl FnOnce() -> T + Unpin) -> &T { + match self.get_or_try_init_blocking(move || { + let result: Result = Ok(closure()); + result + }) { + Ok(value) => value, + Err(infallible) => match infallible {}, + } + } + + /// Try to set the value of the cell. + /// + /// If the cell is already initialized, this method returns the original + /// value back. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// + /// assert_eq!(cell.set(1).await, Ok(&1)); + /// assert_eq!(cell.get(), Some(&1)); + /// assert_eq!(cell.set(2).await, Err(2)); + /// # }); + /// ``` + pub async fn set(&self, value: T) -> Result<&T, T> { + let mut value = Some(value); + self.get_or_init(|| async { value.take().unwrap() }).await; + + match value { + Some(value) => Err(value), + None => { + // SAFETY: value was taken, so we are initialized + Ok(unsafe { self.get_unchecked() }) + } + } + } + + /// Try to set the value of the cell. + /// + /// If the cell is already initialized, this method returns the original + /// value back. + /// + /// # Blocking + /// + /// In contrast to the `set` method, this method blocks the current thread of + /// execution instead of awaiting. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a `OnceCell` can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// let cell = OnceCell::new(); + /// + /// assert_eq!(cell.set_blocking(1), Ok(&1)); + /// assert_eq!(cell.get(), Some(&1)); + /// assert_eq!(cell.set_blocking(2), Err(2)); + /// ``` + pub fn set_blocking(&self, value: T) -> Result<&T, T> { + let mut value = Some(value); + self.get_or_init_blocking(|| value.take().unwrap()); + + match value { + Some(value) => Err(value), + None => { + // SAFETY: value was taken, so we are initialized + Ok(unsafe { self.get_unchecked() }) + } + } + } + + /// Wait for the cell to be initialized, optionally using a closure + /// to initialize the cell if it is not initialized yet. + #[cold] + async fn initialize_or_wait>, F: FnOnce() -> Fut>( + &self, + closure: F, + strategy: &mut impl Strategy, + ) -> Result<(), E> { + // The event listener we're currently waiting on. + let mut event_listener = None; + + let mut closure = Some(closure); + + loop { + // Check the current state of the cell. + let state = self.state.load(Ordering::Acquire); + + // Determine what we should do based on our state. + match state.into() { + State::Initialized => { + // The cell is initialized now, so we can return. + return Ok(()); + } + State::Initializing => { + // The cell is currently initializing, or the cell is uninitialized + // but we do not have the ability to initialize it. + // + // We need to wait the initialization to complete. + future::poll_fn(|cx| { + match event_listener.take() { + None => { + event_listener = Some(self.active_initializers.listen()); + } + Some(evl) => { + if let Err(evl) = strategy.poll(evl, cx) { + event_listener = Some(evl); + return Poll::Pending; + } + } + } + + Poll::Ready(()) + }) + .await; + } + State::Uninitialized => { + // Try to move the cell into the initializing state. + if self + .state + .compare_exchange( + State::Uninitialized.into(), + State::Initializing.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_err() + { + // The cell was initialized while we were trying to + // initialize it. + continue; + } + + // Now that we have an exclusive lock on the cell's value, + // we can try to initialize it. + let _guard = Guard(self); + let initializer = closure.take().unwrap(); + match (initializer)().await { + Ok(value) => { + // Write the value into the cell and update the state. + unsafe { + ptr::write(self.value.get().cast(), value); + } + forget(_guard); + self.state + .store(State::Initialized.into(), Ordering::Release); + + // Notify the listeners that the value is initialized. + self.active_initializers.notify_additional(std::usize::MAX); + self.passive_waiters.notify_additional(std::usize::MAX); + + return Ok(()); + } + Err(err) => { + // Update the state to indicate that the value is + // uninitialized. + drop(_guard); + + return Err(err); + } + } + } + } + } + + /// Set the cell's state back to `UNINITIALIZED on drop. + /// + /// If the closure panics, this ensures that the cell's state is set back to + /// `UNINITIALIZED` and that the next listener is notified. + struct Guard<'a, T>(&'a OnceCell); + + impl<'a, T> Drop for Guard<'a, T> { + fn drop(&mut self) { + self.0 + .state + .store(State::Uninitialized.into(), Ordering::Release); + + // Notify the next initializer that it's their turn. + self.0.active_initializers.notify(1); + } + } + } + + /// Get a reference to the inner value. + /// + /// # Safety + /// + /// The caller must ensure that the cell is initialized. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// # futures_lite::future::block_on(async { + /// let cell = OnceCell::new(); + /// cell.set(1).await; + /// + /// // SAFETY: We know that the value is initialized, so it is safe to + /// // read it. + /// assert_eq!(unsafe { cell.get_unchecked() }, &1); + /// # }); + /// ``` + pub unsafe fn get_unchecked(&self) -> &T { + // SAFETY: The caller asserts that the value is initialized + &*self.value.get().cast() + } +} + +impl From for OnceCell { + /// Create a new, initialized `OnceCell` from an existing value. + /// + /// # Example + /// + /// ```rust + /// use async_lock::OnceCell; + /// + /// let cell = OnceCell::from(42); + /// assert_eq!(cell.get(), Some(&42)); + /// ``` + fn from(value: T) -> Self { + Self { + active_initializers: Event::new(), + passive_waiters: Event::new(), + state: AtomicUsize::new(State::Initialized.into()), + value: UnsafeCell::new(MaybeUninit::new(value)), + } + } +} + +impl fmt::Debug for OnceCell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Inner<'a, T>(&'a OnceCell); + + impl fmt::Debug for Inner<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.state.load(Ordering::Acquire).into() { + State::Uninitialized => f.write_str(""), + State::Initializing => f.write_str(""), + State::Initialized => { + // SAFETY: "value" is initialized. + let value = unsafe { self.0.get_unchecked() }; + fmt::Debug::fmt(value, f) + } + } + } + } + + f.debug_tuple("OnceCell").field(&Inner(self)).finish() + } +} + +impl Drop for OnceCell { + fn drop(&mut self) { + drop(self.take()); + } +} + +/// Either return the result of a future now, or panic. +fn now_or_never(f: impl Future) -> T { + const NOOP_WAKER: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop); + + unsafe fn wake(_: *const ()) {} + unsafe fn wake_by_ref(_: *const ()) {} + unsafe fn clone(_: *const ()) -> RawWaker { + RawWaker::new(ptr::null(), &NOOP_WAKER) + } + unsafe fn drop(_: *const ()) {} + + futures_lite::pin!(f); + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER)) }; + + // Poll the future exactly once. + let mut cx = Context::from_waker(&waker); + + match f.poll(&mut cx) { + Poll::Ready(value) => value, + Poll::Pending => unreachable!("future not ready"), + } +} + +/// The strategy for polling an `event_listener::EventListener`. +trait Strategy { + /// Poll the event listener. + fn poll(&mut self, evl: EventListener, ctx: &mut Context<'_>) -> Result<(), EventListener>; +} + +/// The strategy for blocking the current thread on an `EventListener`. +struct Blocking; + +impl Strategy for Blocking { + fn poll(&mut self, evl: EventListener, _: &mut Context<'_>) -> Result<(), EventListener> { + evl.wait(); + Ok(()) + } +} + +/// The strategy for polling an `EventListener` in an async context. +struct NonBlocking; + +impl Strategy for NonBlocking { + fn poll(&mut self, mut evl: EventListener, ctx: &mut Context<'_>) -> Result<(), EventListener> { + match Pin::new(&mut evl).poll(ctx) { + Poll::Pending => Err(evl), + Poll::Ready(()) => Ok(()), + } + } +}