diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ffa32e6..6bc372c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,12 +1,19 @@ name: Rust on: - push: { branches: "main" } - pull_request: { branches: "*" } + push: { branches: ["main"] } + pull_request: { branches: ["*"] } jobs: build-and-test: strategy: matrix: - os: [ubuntu-latest, ubuntu-22.04-arm, windows-latest, windows-11-arm, macos-latest] + os: + [ + ubuntu-latest, + ubuntu-22.04-arm, + windows-latest, + windows-11-arm, + macos-latest, + ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -26,4 +33,3 @@ jobs: run: cargo build --verbose - name: Run tests run: cargo nextest run --verbose - diff --git a/Cargo.toml b/Cargo.toml index 3dcc544..a0a41ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,16 @@ [package] -name = "wait_on_address" -description = "Cross-platform atomic wait and wake (aka futex) functionality." -repository = "https://github.com/DouglasDwyer/wait_on_address" -keywords = ["atomic", "futex"] +name = "ecmascript_futex" +description = "Cross-platform atomic wait and wake (aka futex) functionality using the ECMAScript Atomics memory model." +repository = "https://github.com/trynova/ecmascript_futex" +keywords = ["atomic", "futex", "ecmascript"] version = "0.1.1" edition = "2024" license = "BSD-2-Clause" categories = ["concurrency", "os", "no-std"] +[dependencies] +ecmascript_atomics = { version = "0.2.3" } + [target.'cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "macos"))'.dependencies] libc = { version = "0.2", default-features = false } @@ -20,4 +23,7 @@ wasm-bindgen = { version = "0.2.90", default-features = false } web-sys = { version = "0.3.24", default-features = false, features = [ "Window" ] } [build-dependencies] -rustversion = { version = "1.0.14", default-features = false } \ No newline at end of file +rustversion = { version = "1.0.14", default-features = false } + +[dev-dependencies] +ecmascript_atomics = { version = "0.2.3", features = ["alloc"] } diff --git a/README.md b/README.md index 2cbb637..a714d66 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,20 @@ -# wait_on_address +# ecmascript_futex -[![Crates.io](https://img.shields.io/crates/v/wait_on_address.svg)](https://crates.io/crates/wait_on_address) -[![Docs.rs](https://docs.rs/wait_on_address/badge.svg)](https://docs.rs/wait_on_address) +Cross platform library for implementing ECMAScript `Atomics.wait`, +`Atomics.wakeAsync`, and `Atomics.notify` (aka futex) functionality in Rust, +operating on ECMAScript memory as produced by the +[`ecmascript_atomics`](https://github.com/trynova/ecmascript_atomics) crate. +This crate is a fork of +[`wait_on_address`](https://github.com/DouglasDwyer/wait_on_address) which is +itself a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait). The +changes inherited and kept from `wait_on_address` are: -Cross platform atomic wait and wake (aka futex) functionality. This crate is a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait), and extends the original code with the following functionality: - -- Support for `AtomicI32`, `AtomicI64`, and `AtomicU64` - Support for waiting with a timeout - Support for `wasm32` on nightly using `std::arch` - Polyfill for all other platforms +The main + Natively-supported platforms: - Windows 8+, Windows Server 2012+ @@ -21,16 +26,18 @@ Natively-supported platforms: ## Usage ```rust -use std::{sync::atomic::AtomicU64, time::Duration}; -use wait_on_address::AtomicWait; +use core::time::Duration; +use ecmascript_atomics::{Racy, RacyBox}; +use ecmascript_futex::ECMAScriptAtomicWait; -let a = AtomicU64::new(0); +let a = RacyBox::new(0u64).unwrap(); +let a = a.as_slice().get(0).unwrap(); a.wait(1); // If the value is 1, wait. a.wait_timeout(2, Duration::from_millis(100)); // If the value is 2, wait at most 100 milliseconds -a.notify_one(); // Wake one waiting thread. +a.notify_many(1); // Wake one waiting thread. a.notify_all(); // Wake all waiting threads. ``` @@ -43,8 +50,11 @@ On FreeBSD, this uses the `_umtx_op` syscall. On Windows, this uses the `WaitOnAddress` and `WakeByAddress` APIs. -On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and `os_sync_wake_by_address` APIs. +On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and +`os_sync_wake_by_address` APIs. -On wasm32 with `nightly`, this uses `memory_atomic_wait32`, `memory_atomic_wait64`, and `memory_atomic_notify` instructions. +On wasm32 with `nightly`, this uses `memory_atomic_wait32`, +`memory_atomic_wait64`, and `memory_atomic_notify` instructions. -All other platforms with `std` support fall back to a fixed-size hashmap of `Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. \ No newline at end of file +All other platforms with `std` support fall back to a fixed-size hashmap of +`Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. diff --git a/rust_toolchain.toml b/rust_toolchain.toml new file mode 100644 index 0000000..73cb934 --- /dev/null +++ b/rust_toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustfmt", "clippy"] diff --git a/src/condvar_table.rs b/src/condvar_table.rs index 32e919c..42edc04 100644 --- a/src/condvar_table.rs +++ b/src/condvar_table.rs @@ -4,6 +4,8 @@ use std::{ time::Duration, }; +use crate::FutexError; + /// The number of OS synchronization primitives to use. const TABLE_SIZE: usize = 256; @@ -12,9 +14,14 @@ static TABLE: [TableEntry; TABLE_SIZE] = [TableEntry::DEFAULT; TABLE_SIZE]; /// Puts the current thread to sleep if `condition` evaluates to `true`. /// The thread will be woken after `timeout` if it is provided. -pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option) { +pub fn wait( + ptr: *const (), + condition: impl Fn() -> bool, + timeout: Option, +) -> Result<(), FutexError> { let entry = &TABLE[entry_for_ptr(ptr) as usize]; let mut guard = spin_lock(&entry.mutex); + let mut timedout = false; if condition() { if guard.waiting_count == 0 { guard.address = ptr; @@ -25,42 +32,58 @@ pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option usize { + if ptr.is_null() { + return 0; } + let entry = &TABLE[entry_for_ptr(ptr) as usize]; + let metadata = *spin_lock(&entry.mutex); + if 0 < metadata.waiting_count { + entry.condvar.notify_all(); + } + metadata.waiting_count } /// Wakes at least one thread waiting on `ptr`. -pub fn notify_one(ptr: *const ()) { - if !ptr.is_null() { - let entry = &TABLE[entry_for_ptr(ptr) as usize]; - let metadata = *spin_lock(&entry.mutex); - if 0 < metadata.waiting_count { - if metadata.address.is_null() { - entry.condvar.notify_all(); - } else if metadata.address == ptr { - entry.condvar.notify_one(); - } +pub fn notify_many(ptr: *const (), count: usize) -> usize { + if ptr.is_null() { + return 0; + } + let entry = &TABLE[entry_for_ptr(ptr) as usize]; + let metadata = *spin_lock(&entry.mutex); + if metadata.waiting_count == 0 { + 0 + } else if metadata.waiting_count < count || metadata.address.is_null() { + entry.condvar.notify_all(); + metadata.waiting_count + } else { + for _ in 0..count { + entry.condvar.notify_one(); } + count } } @@ -79,9 +102,9 @@ fn spin_lock(mutex: &Mutex) -> MutexGuard<'_, T> { /// Gets the entry index to use for the given address. fn entry_for_ptr(ptr: *const ()) -> u8 { let x_64 = ptr as u64; - let x_32 = (x_64 >> 32) as u32 | x_64 as u32; - let x_16 = (x_32 >> 16) as u16 | x_32 as u16; - (x_16 >> 8) as u8 | x_16 as u8 + let x_32 = (x_64 >> 32) as u32 ^ x_64 as u32; + let x_16 = (x_32 >> 16) as u16 ^ x_32 as u16; + (x_16 >> 8) as u8 ^ (x_16 >> 2) as u8 } /// Holds metadata that gets written while locking. diff --git a/src/fallback.rs b/src/fallback.rs index d1dbf3c..a71dee2 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -1,46 +1,53 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64, Ordering}, - time::Duration, -}; +use core::time::Duration; -use crate::{condvar_table, private::AtomicWaitImpl}; +use ecmascript_atomics::{Ordering, Racy}; -impl AtomicWaitImpl for AtomicU32 { +use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; + +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } - fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + fn notify_all(&self) -> usize { + condvar_table::notify_all(self.addr()) } - fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + fn notify_many(&self, count: usize) -> usize { + condvar_table::notify_many(self.addr(), count) } } -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } - fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + fn notify_all(&self) -> usize { + condvar_table::notify_all(self.addr()) } - fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + fn notify_many(&self, count: usize) -> usize { + condvar_table::notify_many(self.addr(), count) } } diff --git a/src/freebsd.rs b/src/freebsd.rs index afa5924..84a5e8e 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -1,15 +1,18 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; -use crate::private::AtomicWaitImpl; +use ecmascript_atomics::Racy; -impl AtomicWaitImpl for AtomicU32 { +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; + +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { _clockid: libc::CLOCK_MONOTONIC as u32, @@ -21,54 +24,88 @@ impl AtomicWaitImpl for AtomicU32 { }; libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT_UINT_PRIVATE, value as u64, size_of::() as *mut _, &wait_timespec as *const _ as *mut _, - ); + ) } else { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT_UINT_PRIVATE, value as u64, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) } }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) + } else { + Err(FutexError::Unknown) + } + } } - fn notify_all(&self) { - unsafe { + fn notify_all(&self) -> usize { + let result = unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + result as usize + } else if result == 0 { + usize::MAX + } else { + 0 + } } - fn notify_one(&self) { - unsafe { + fn notify_many(&self, count: usize) -> usize { + let result = unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + (result as usize).min(count) + } else if result == 0 { + count + } else { + 0 + } } } -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { _clockid: libc::CLOCK_MONOTONIC as u32, @@ -80,45 +117,75 @@ impl AtomicWaitImpl for AtomicU64 { }; libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT, value, size_of::() as *mut _, &wait_timespec as *const _ as *mut _, - ); + ) } else { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT, value, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) } }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) + } else { + Err(FutexError::Unknown) + } + } } - fn notify_all(&self) { - unsafe { + fn notify_all(&self) -> usize { + let result = unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + result as usize + } else if result == 0 { + usize::MAX + } else { + 0 + } } - fn notify_one(&self) { - unsafe { + fn notify_many(&self, count: usize) -> usize { + let result = unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + (result as usize).min(count) + } else if result == 0 { + count + } else { + 0 + } } } diff --git a/src/lib.rs b/src/lib.rs index 119df60..c900087 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,10 +4,9 @@ feature(stdarch_wasm_atomic_wait) )] -use std::{ - sync::atomic::{AtomicI32, AtomicI64, AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::Racy; #[cfg(any(target_os = "linux", target_os = "android"))] #[path = "linux.rs"] @@ -44,17 +43,33 @@ mod platform; /// A table of OS synchronization primitives for manually /// implementing futex functionality on unsupported platforms. -#[allow(unused)] +#[cfg(not(any( + target_os = "freebsd", + target_os = "macos", + target_os = "ios", + target_os = "watchos", + windows +)))] mod condvar_table; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FutexError { + /// The value was not equal and no sleep was performed. + NotEqual, + /// Timeout fired. + Timeout, + /// An unknown error occurred. + Unknown, +} + /// A type that supports atomic waits. -pub trait AtomicWait: private::AtomicWaitImpl { +pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// If the value is `value`, wait until woken up. /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait(&self, value: Self::AtomicInner) { - private::AtomicWaitImpl::wait_timeout(self, value, None); + fn wait(&self, value: Self::AtomicInner) -> Result<(), FutexError> { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, None) } /// If the value is `value`, wait until timeout elapses @@ -62,105 +77,49 @@ pub trait AtomicWait: private::AtomicWaitImpl { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Duration) { - private::AtomicWaitImpl::wait_timeout(self, value, Some(timeout)); + fn wait_timeout(&self, value: Self::AtomicInner, timeout: Duration) -> Result<(), FutexError> { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, Some(timeout)) } /// Wake one thread that is waiting on this atomic. - fn notify_one(&self) { - private::AtomicWaitImpl::notify_one(self); + fn notify_many(&self, count: usize) -> usize { + private::ECMAScriptAtomicWaitImpl::notify_many(self, count) } /// Wake all threads that are waiting on this atomic. - fn notify_all(&self) { - private::AtomicWaitImpl::notify_all(self); + fn notify_all(&self) -> usize { + private::ECMAScriptAtomicWaitImpl::notify_all(self) } } -impl AtomicWait for AtomicU32 {} -impl AtomicWait for AtomicU64 {} -impl AtomicWait for AtomicI32 {} -impl AtomicWait for AtomicI64 {} - -impl private::AtomicWaitImpl for AtomicI32 { - type AtomicInner = i32; - - fn notify_all(&self) { - unsafe { - private::AtomicWaitImpl::notify_all(std::mem::transmute::<&AtomicI32, &AtomicU32>( - self, - )); - } - } - - fn notify_one(&self) { - unsafe { - private::AtomicWaitImpl::notify_one(std::mem::transmute::<&AtomicI32, &AtomicU32>( - self, - )); - } - } - - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { - private::AtomicWaitImpl::wait_timeout( - std::mem::transmute::<&AtomicI32, &AtomicU32>(self), - value as u32, - timeout, - ); - } - } -} - -impl private::AtomicWaitImpl for AtomicI64 { - type AtomicInner = i64; - - fn notify_all(&self) { - unsafe { - private::AtomicWaitImpl::notify_all(std::mem::transmute::<&AtomicI64, &AtomicU64>( - self, - )); - } - } - - fn notify_one(&self) { - unsafe { - private::AtomicWaitImpl::notify_one(std::mem::transmute::<&AtomicI64, &AtomicU64>( - self, - )); - } - } - - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { - private::AtomicWaitImpl::wait_timeout( - std::mem::transmute::<&AtomicI64, &AtomicU64>(self), - value as u64, - timeout, - ); - } - } -} +impl ECMAScriptAtomicWait for Racy<'_, u32> {} +impl ECMAScriptAtomicWait for Racy<'_, u64> {} /// Private implementation details. mod private { - use std::time::Duration; + use core::time::Duration; + + use crate::FutexError; /// A trait that cannot be implemented by other crates. - pub trait AtomicWaitImpl { + pub trait ECMAScriptAtomicWaitImpl { /// The underlying integer type for the atomic. type AtomicInner; /// Wake all threads that are waiting on this atomic. - fn notify_all(&self); + fn notify_all(&self) -> usize; /// Wake one thread that is waiting on this atomic. - fn notify_one(&self); + fn notify_many(&self, count: usize) -> usize; /// If the value is `value`, wait until woken up. /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option); + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError>; } } diff --git a/src/linux.rs b/src/linux.rs index 86e4476..cbc64a0 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -1,23 +1,26 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64, Ordering}, - time::Duration, -}; +use core::time::Duration; -use crate::{condvar_table, private::AtomicWaitImpl}; +use ecmascript_atomics::{Ordering, Racy}; -impl AtomicWaitImpl for AtomicU32 { +use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; + +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { let wait_timespec = timeout.map(|x| libc::timespec { tv_sec: x.as_secs() as i64, tv_nsec: x.subsec_nanos() as i64, }); - libc::syscall( + let result = libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, value, wait_timespec @@ -25,48 +28,70 @@ impl AtomicWaitImpl for AtomicU32 { .map(|x| x as *const _) .unwrap_or(std::ptr::null()), ); + if result == 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) + } else { + Err(FutexError::Unknown) + } + } } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, i32::MAX, - ); - }; + ) + .unsigned_abs() as usize + } } - fn notify_one(&self) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, - 1i32, - ); - }; + count.min(i32::MAX as usize) as i32, + ) + .unsigned_abs() as usize + } } } -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } - fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + fn notify_all(&self) -> usize { + condvar_table::notify_all(self.addr()) } - fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + fn notify_many(&self, count: usize) -> usize { + condvar_table::notify_many(self.addr(), count) } } diff --git a/src/macos.rs b/src/macos.rs index b578649..d3ab007 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -1,98 +1,154 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; -use crate::private::AtomicWaitImpl; +use ecmascript_atomics::Racy; -impl AtomicWaitImpl for AtomicU32 { +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; + +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self as *const _ as *mut _, + self.addr() as *mut libc::c_void, value as u64, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - libc::CLOCK_MONOTONIC, - time.as_nanos().min(u64::MAX as u128) as u64, - ); + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + (time.as_nanos().min(u64::MAX as u128) as u64).max(1), + ) } else { libc::os_sync_wait_on_address( - self as *const _ as *mut _, + self.addr() as *mut libc::c_void, value as u64, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - ); + ) + } + }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) } } } - fn notify_all(&self) { - unsafe { + fn notify_all(&self) -> usize { + let result = unsafe { libc::os_sync_wake_by_address_all( - self as *const _ as *mut _, - size_of::(), + self.addr() as *mut libc::c_void, + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up + usize::MAX + } else { + // No threads were woken up. + 0 + } } - fn notify_one(&self) { - unsafe { - libc::os_sync_wake_by_address_any( - self as *const _ as *mut _, - size_of::(), - libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); - }; + fn notify_many(&self, count: usize) -> usize { + for i in 0..count { + let result = unsafe { + libc::os_sync_wake_by_address_any( + self.addr() as *mut libc::c_void, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ) + }; + if result != 0 { + // No threads were woken up. + return i; + } + } + count } } -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self as *const _ as *mut _, + self.addr() as *mut libc::c_void, value, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - libc::CLOCK_MONOTONIC, - time.as_nanos().min(u64::MAX as u128) as u64, - ); + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + (time.as_nanos().min(u64::MAX as u128) as u64).max(1), + ) } else { libc::os_sync_wait_on_address( - self as *const _ as *mut _, + self.addr() as *mut libc::c_void, value, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - ); + ) + } + }; + if result >= 0 { + // Result indicates how many waiters remain. + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) } } } - fn notify_all(&self) { - unsafe { + fn notify_all(&self) -> usize { + let result = unsafe { libc::os_sync_wake_by_address_all( - self as *const _ as *mut _, - size_of::(), + self.addr() as *mut libc::c_void, + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up + usize::MAX + } else { + // No threads were woken up. + 0 + } } - fn notify_one(&self) { - unsafe { - libc::os_sync_wake_by_address_any( - self as *const _ as *mut _, - size_of::(), - libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); - }; + fn notify_many(&self, count: usize) -> usize { + for i in 0..count { + let result = unsafe { + libc::os_sync_wake_by_address_any( + self.addr() as *mut libc::c_void, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ) + }; + if result != 0 { + // No threads were woken up. + return i; + } + } + count } } diff --git a/src/wasm32.rs b/src/wasm32.rs index c08325e..0a09c6d 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -1,10 +1,8 @@ -use std::{ - hint::spin_loop, - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use std::{hint::spin_loop, time::Duration}; -use crate::private::AtomicWaitImpl; +use ecmascript_atomics::{Ordering, Racy}; + +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; /// Whether this thread is allowed to block and use synchronization primitives. #[inline(always)] @@ -17,117 +15,149 @@ fn can_block() -> bool { } #[cfg(not(nightly))] -impl AtomicWaitImpl for AtomicU32 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { if can_block() { crate::condvar_table::wait( - self as *const _ as *const _, - || self.load(std::sync::atomic::Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } else { spin_loop(); + Ok(()) } } - fn notify_all(&self) { - crate::condvar_table::notify_all(self as *const _ as *const _); + fn notify_all(&self) -> usize { + crate::condvar_table::notify_all(self.addr()) } - fn notify_one(&self) { - crate::condvar_table::notify_one(self as *const _ as *const _); + fn notify_many(&self, count: usize) -> usize { + crate::condvar_table::notify_many(self.addr(), count) } } #[cfg(not(nightly))] -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { if can_block() { crate::condvar_table::wait( - self as *const _ as *const _, - || self.load(std::sync::atomic::Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } else { spin_loop(); + Ok(()) } } - fn notify_all(&self) { - crate::condvar_table::notify_all(self as *const _ as *const _); + fn notify_all(&self) -> usize { + crate::condvar_table::notify_all(self.addr()) } - fn notify_one(&self) { - crate::condvar_table::notify_one(self as *const _ as *const _); + fn notify_many(&self, count: usize) -> usize { + crate::condvar_table::notify_many(self.addr(), count) } } #[cfg(nightly)] -impl AtomicWaitImpl for AtomicU32 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if can_block() { - std::arch::wasm32::memory_atomic_wait32( - self as *const _ as *mut _, + let result = std::arch::wasm32::memory_atomic_wait32( + self.addr(), value as i32, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) .unwrap_or(i64::MAX), ); + if result == 0 { + Ok(()) + } else if result == 1 { + Err(FutexError::NotEqual) + } else if result == 2 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } else { spin_loop(); + Ok(()) } } } - fn notify_all(&self) { - unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, u32::MAX); - }; + fn notify_all(&self) -> usize { + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX) as usize } } - fn notify_one(&self) { - unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, 1); - }; + fn notify_many(&self, count: usize) -> usize { + let count = u32::try_from(count).unwrap_or(u32::MAX); + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), count) as usize } } } #[cfg(nightly)] -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if can_block() { - std::arch::wasm32::memory_atomic_wait64( - self as *const _ as *mut _, + let result = std::arch::wasm32::memory_atomic_wait64( + self.addr(), value as i64, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) .unwrap_or(i64::MAX), ); + if result == 0 { + Ok(()) + } else if result == 1 { + Err(FutexError::NotEqual) + } else if result == 2 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } else { spin_loop(); + Ok(()) } } } - fn notify_all(&self) { - unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, u32::MAX); - }; + fn notify_all(&self) -> usize { + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX) as usize } } - fn notify_one(&self) { - unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, 1); - }; + fn notify_many(&self, count: usize) -> usize { + let count = u32::try_from(count).unwrap_or(u32::MAX); + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), count) as usize } } } diff --git a/src/windows.rs b/src/windows.rs index 352b54f..003d6a1 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,22 +1,26 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; -use windows_sys::Win32::System::Threading::{ - INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle, +use core::time::Duration; + +use ecmascript_atomics::Racy; +use windows_sys::Win32::{ + Foundation::ERROR_TIMEOUT, + System::Threading::{INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle}, }; -use crate::private::AtomicWaitImpl; +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; -impl AtomicWaitImpl for AtomicU32 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { WaitOnAddress( - self as *const _ as *const _, - &value as *const _ as *const _, - size_of::(), + self.addr() as *const core::ffi::c_void, + &value as *const u32 as *const core::ffi::c_void, + size_of::(), timeout .map(|x| { // Clamp to a finite u32 millisecond timeout. INFINITE (0xFFFFFFFF) @@ -26,40 +30,72 @@ impl AtomicWaitImpl for AtomicU32 { capped as u32 }) .unwrap_or(INFINITE), - ); + ) + }; + if result == 1 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == ERROR_TIMEOUT as i32 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } } - fn notify_all(&self) { - unsafe { WakeByAddressAll(self as *const _ as *const _) }; + fn notify_all(&self) -> usize { + unsafe { WakeByAddressAll(self.addr() as *const core::ffi::c_void) }; + usize::MAX } - fn notify_one(&self) { - unsafe { WakeByAddressSingle(self as *const _ as *const _) }; + fn notify_many(&self, count: usize) -> usize { + for _ in 0..count { + unsafe { WakeByAddressSingle(self.addr() as *const core::ffi::c_void) }; + } + count } } -impl AtomicWaitImpl for AtomicU64 { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { + let result = unsafe { WaitOnAddress( - self as *const _ as *const _, - &value as *const _ as *const _, - size_of::(), + self.addr() as *const core::ffi::c_void, + &value as *const u64 as *const core::ffi::c_void, + size_of::(), timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) .unwrap_or(INFINITE), - ); + ) + }; + if result == 1 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == ERROR_TIMEOUT as i32 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } } - fn notify_all(&self) { - unsafe { WakeByAddressAll(self as *const _ as *const _) }; + fn notify_all(&self) -> usize { + unsafe { WakeByAddressAll(self.addr() as *const core::ffi::c_void) }; + usize::MAX } - fn notify_one(&self) { - unsafe { WakeByAddressSingle(self as *const _ as *const _) }; + fn notify_many(&self, count: usize) -> usize { + for _ in 0..count { + unsafe { WakeByAddressSingle(self.addr() as *const core::ffi::c_void) }; + } + count } } diff --git a/tests/test.rs b/tests/test.rs index a27a46d..36c97bc 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,107 +1,115 @@ +use ecmascript_atomics::{Ordering, RacyBox}; +use ecmascript_futex::{ECMAScriptAtomicWait, FutexError}; use std::{ - sync::atomic::{AtomicU32, Ordering::Relaxed}, thread::sleep, time::{Duration, Instant}, }; -use wait_on_address::AtomicWait; #[test] fn wake_nothing() { - let a = AtomicU32::new(0); - a.notify_one(); + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + assert!(a.notify_many(1) <= 1); a.notify_all(); } #[test] fn wait_unexpected() { let t = Instant::now(); - let a = AtomicU32::new(0); - a.wait(1); + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + // Note: Windows and iOS doesn't report early-exits. + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "watchos", windows)))] + assert_eq!(a.wait(1), Err(FutexError::NotEqual)); + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "watchos", windows))] + assert_eq!(a.wait(1), Ok(())); assert!(t.elapsed().as_millis() < 100); } #[test] fn wait_wake() { let t = Instant::now(); - let a = AtomicU32::new(0); + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); std::thread::scope(|s| { s.spawn(|| { sleep(Duration::from_millis(100)); - a.store(1, Relaxed); - a.notify_one(); + a.store(1, Ordering::Unordered); + assert_eq!(a.notify_many(1), 1); }); - while a.load(Relaxed) == 0 { - a.wait(0); + while a.load(Ordering::Unordered) == 0 { + assert!(a.wait(0).is_ok()); } - assert_eq!(a.load(Relaxed), 1); + assert_eq!(a.load(Ordering::Unordered), 1); assert!((90..400).contains(&t.elapsed().as_millis())); }); } #[test] fn wait_timeout() { - let a = AtomicU32::new(0); - a.wait_timeout(0, Duration::from_millis(1)); + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + // Note: it's possible for us to receive a spurious wake. + assert!(matches!( + a.wait_timeout(0, Duration::from_millis(1)), + Ok(()) | Err(FutexError::Timeout) + )); } #[test] fn stress_many_waiters_notify_all() { - use std::sync::Arc; - let a = Arc::new(AtomicU32::new(0)); - let woke = Arc::new(AtomicU32::new(0)); + let a = RacyBox::new(0u32).unwrap(); + let woke = RacyBox::new(0u32).unwrap(); let threads = 64; + let a = a.as_slice().get(0).unwrap(); + let woke = woke.as_slice().get(0).unwrap(); std::thread::scope(|s| { for _ in 0..threads { - let a = a.clone(); - let woke = woke.clone(); s.spawn(move || { - while a.load(Relaxed) == 0 { - a.wait(0); + while a.load(Ordering::Unordered) == 0 { + assert!(a.wait(0).is_ok()); } - woke.fetch_add(1, Relaxed); + woke.fetch_add(1); }); } // Give threads time to start waiting sleep(Duration::from_millis(50)); - a.store(1, Relaxed); - a.notify_all(); + a.store(1, Ordering::Unordered); + assert!(a.notify_all() >= threads as usize); }); - - assert_eq!(woke.load(Relaxed), threads); + assert_eq!(woke.load(Ordering::Unordered), threads); } #[test] fn stress_ping_pong_many_iters() { - use std::sync::Arc; - let state = Arc::new(AtomicU32::new(0)); + let state = RacyBox::new(0u32).unwrap(); let iters = 5_000u32; + let state = state.as_slice().get(0).unwrap(); std::thread::scope(|s| { - let state_c = state.clone(); s.spawn(move || { // Consumer: wait for 1, reset to 0, and notify producer. for _ in 0..iters { - while state_c.load(Relaxed) != 1 { + while state.load(Ordering::Unordered) != 1 { // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. - state_c.wait_timeout(0, Duration::from_millis(10)); + let _ = state.wait_timeout(0, Duration::from_millis(10)).is_ok(); } - state_c.store(0, Relaxed); - state_c.notify_one(); + state.store(0, Ordering::Unordered); + assert!(state.notify_many(1) <= 1); } }); // Producer: set to 1, notify consumer, then wait until it resets to 0. for _ in 0..iters { - state.store(1, Relaxed); - state.notify_one(); - while state.load(Relaxed) != 0 { - state.wait_timeout(1, Duration::from_millis(10)); + state.store(1, Ordering::Unordered); + assert!(state.notify_many(1) <= 1); + while state.load(Ordering::Unordered) != 0 { + let _ = state.wait_timeout(1, Duration::from_millis(10)); } } }); - // Final state should be 0 after a complete ping-pong. - assert_eq!(state.load(Relaxed), 0); + assert_eq!(state.load(Ordering::Unordered), 0); }