From c4874062967dd1606c8e38e4439c1aefa2c021e9 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Mon, 17 Nov 2025 13:35:23 +0200 Subject: [PATCH 01/32] fork --- Cargo.toml | 12 ++++++------ README.md | 4 ++-- rust_toolchain.toml | 3 +++ tests/test.rs | 2 +- 4 files changed, 12 insertions(+), 9 deletions(-) create mode 100644 rust_toolchain.toml diff --git a/Cargo.toml b/Cargo.toml index 3dcc544..6cd67e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "wait_on_address" -description = "Cross-platform atomic wait and wake (aka futex) functionality." -repository = "https://github.com/DouglasDwyer/wait_on_address" -keywords = ["atomic", "futex"] +name = "ecmascript_futex" +description = "Cross-platform atomic wait and wake (aka futex) functionality using the ECMAScript Atomics memory model." +repository = "https://github.com/trynova/ecmascript_futex" +keywords = ["atomic", "futex", "ecmascript"] version = "0.1.1" -edition = "2024" +edition = "2021" license = "BSD-2-Clause" categories = ["concurrency", "os", "no-std"] @@ -20,4 +20,4 @@ wasm-bindgen = { version = "0.2.90", default-features = false } web-sys = { version = "0.3.24", default-features = false, features = [ "Window" ] } [build-dependencies] -rustversion = { version = "1.0.14", default-features = false } \ No newline at end of file +rustversion = { version = "1.0.14", default-features = false } diff --git a/README.md b/README.md index 2cbb637..ef04547 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Natively-supported platforms: ```rust use std::{sync::atomic::AtomicU64, time::Duration}; -use wait_on_address::AtomicWait; +use ecmascript_futex::AtomicWait; let a = AtomicU64::new(0); @@ -47,4 +47,4 @@ On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and `os_ On wasm32 with `nightly`, this uses `memory_atomic_wait32`, `memory_atomic_wait64`, and `memory_atomic_notify` instructions. -All other platforms with `std` support fall back to a fixed-size hashmap of `Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. \ No newline at end of file +All other platforms with `std` support fall back to a fixed-size hashmap of `Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. diff --git a/rust_toolchain.toml b/rust_toolchain.toml new file mode 100644 index 0000000..73cb934 --- /dev/null +++ b/rust_toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustfmt", "clippy"] diff --git a/tests/test.rs b/tests/test.rs index a27a46d..66780e4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,9 +1,9 @@ +use ecmascript_futex::AtomicWait; use std::{ sync::atomic::{AtomicU32, Ordering::Relaxed}, thread::sleep, time::{Duration, Instant}, }; -use wait_on_address::AtomicWait; #[test] fn wake_nothing() { From 1d088add5c89cdc38051f2f7fd709794b8858b5b Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Tue, 18 Nov 2025 09:13:21 +0200 Subject: [PATCH 02/32] feat: ecmascript_atomics Futex --- .github/workflows/rust.yml | 9 ++- Cargo.toml | 5 +- README.md | 37 ++++++--- src/fallback.rs | 27 +++---- src/freebsd.rs | 27 +++---- src/lib.rs | 103 ++++++------------------ src/linux.rs | 35 ++++---- src/macos.rs | 27 +++---- src/wasm32.rs | 40 +++++----- src/windows.rs | 23 +++--- tests/test.rs | 159 ++++++++++++++++++++++--------------- 11 files changed, 238 insertions(+), 254 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ffa32e6..c8e4800 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -6,7 +6,13 @@ jobs: build-and-test: strategy: matrix: - os: [ubuntu-latest, ubuntu-22.04-arm, windows-latest, windows-11-arm, macos-latest] + os: [ + ubuntu-latest, + ubuntu-22.04-arm, + windows-latest, + windows-11-arm, + macos-latest, + ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -26,4 +32,3 @@ jobs: run: cargo build --verbose - name: Run tests run: cargo nextest run --verbose - diff --git a/Cargo.toml b/Cargo.toml index 6cd67e4..18dae9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,13 @@ description = "Cross-platform atomic wait and wake (aka futex) functionality usi repository = "https://github.com/trynova/ecmascript_futex" keywords = ["atomic", "futex", "ecmascript"] version = "0.1.1" -edition = "2021" +edition = "2024" license = "BSD-2-Clause" categories = ["concurrency", "os", "no-std"] +[dependencies] +ecmascript_atomics = { version = "0.2.2", path = "../ecmascript_atomics/ecmascript_atomics" } + [target.'cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "macos"))'.dependencies] libc = { version = "0.2", default-features = false } diff --git a/README.md b/README.md index ef04547..3b82a31 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,20 @@ -# wait_on_address +# ecmascript_futex -[![Crates.io](https://img.shields.io/crates/v/wait_on_address.svg)](https://crates.io/crates/wait_on_address) -[![Docs.rs](https://docs.rs/wait_on_address/badge.svg)](https://docs.rs/wait_on_address) +Cross platform library for implementing ECMAScript `Atomics.wait`, +`Atomics.wakeAsync`, and `Atomics.notify` (aka futex) functionality in Rust, +operating on ECMAScript memory as produced by the +[`ecmascript_atomics`](https://github.com/trynova/ecmascript_atomics) crate. +This crate is a fork of +[`wait_on_address`](https://github.com/DouglasDwyer/wait_on_address) which is +itself a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait). The +changes inherited and kept from `wait_on_address` are: -Cross platform atomic wait and wake (aka futex) functionality. This crate is a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait), and extends the original code with the following functionality: - -- Support for `AtomicI32`, `AtomicI64`, and `AtomicU64` - Support for waiting with a timeout - Support for `wasm32` on nightly using `std::arch` - Polyfill for all other platforms +The main + Natively-supported platforms: - Windows 8+, Windows Server 2012+ @@ -21,10 +26,12 @@ Natively-supported platforms: ## Usage ```rust -use std::{sync::atomic::AtomicU64, time::Duration}; -use ecmascript_futex::AtomicWait; +use core::time::Duration; +use ecmascript_atomics::{Racy, RacyMemory}; +use ecmascript_futex::ECMAScriptAtomicWait; -let a = AtomicU64::new(0); +let a_owned = RacyMemory::new(0u64); +let a = a_owned.as_slice().get(0).unwrap(); a.wait(1); // If the value is 1, wait. @@ -33,6 +40,9 @@ a.wait_timeout(2, Duration::from_millis(100)); // If the value is 2, wait at mo a.notify_one(); // Wake one waiting thread. a.notify_all(); // Wake all waiting threads. + +// SAFETY: a_owned is the only referrer +unsafe { a_owned.exit_and_drop() }; ``` ## Implementation @@ -43,8 +53,11 @@ On FreeBSD, this uses the `_umtx_op` syscall. On Windows, this uses the `WaitOnAddress` and `WakeByAddress` APIs. -On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and `os_sync_wake_by_address` APIs. +On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and +`os_sync_wake_by_address` APIs. -On wasm32 with `nightly`, this uses `memory_atomic_wait32`, `memory_atomic_wait64`, and `memory_atomic_notify` instructions. +On wasm32 with `nightly`, this uses `memory_atomic_wait32`, +`memory_atomic_wait64`, and `memory_atomic_notify` instructions. -All other platforms with `std` support fall back to a fixed-size hashmap of `Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. +All other platforms with `std` support fall back to a fixed-size hashmap of +`Condvar`s, similar to `libstdc++`'s implementation for `std::atomic`. diff --git a/src/fallback.rs b/src/fallback.rs index d1dbf3c..4136c50 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -1,46 +1,45 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64, Ordering}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::{Ordering, Racy}; use crate::{condvar_table, private::AtomicWaitImpl}; -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, ); } fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + condvar_table::notify_all(self.addr()); } fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + condvar_table::notify_one(self.addr()); } } -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, ); } fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + condvar_table::notify_all(self.addr()); } fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + condvar_table::notify_one(self.addr()); } } diff --git a/src/freebsd.rs b/src/freebsd.rs index afa5924..4d29584 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -1,11 +1,10 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::{Ordering, Racy}; use crate::private::AtomicWaitImpl; -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { @@ -21,7 +20,7 @@ impl AtomicWaitImpl for AtomicU32 { }; libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT_UINT_PRIVATE, value as u64, size_of::() as *mut _, @@ -29,7 +28,7 @@ impl AtomicWaitImpl for AtomicU32 { ); } else { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT_UINT_PRIVATE, value as u64, std::ptr::null_mut(), @@ -42,7 +41,7 @@ impl AtomicWaitImpl for AtomicU32 { fn notify_all(&self) { unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), @@ -54,7 +53,7 @@ impl AtomicWaitImpl for AtomicU32 { fn notify_one(&self) { unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), @@ -64,7 +63,7 @@ impl AtomicWaitImpl for AtomicU32 { } } -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { @@ -80,7 +79,7 @@ impl AtomicWaitImpl for AtomicU64 { }; libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT, value, size_of::() as *mut _, @@ -88,7 +87,7 @@ impl AtomicWaitImpl for AtomicU64 { ); } else { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAIT, value, std::ptr::null_mut(), @@ -101,7 +100,7 @@ impl AtomicWaitImpl for AtomicU64 { fn notify_all(&self) { unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), @@ -113,7 +112,7 @@ impl AtomicWaitImpl for AtomicU64 { fn notify_one(&self) { unsafe { libc::_umtx_op( - self as *const _ as *mut _, + self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), diff --git a/src/lib.rs b/src/lib.rs index 119df60..6aacf35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,10 +4,9 @@ feature(stdarch_wasm_atomic_wait) )] -use std::{ - sync::atomic::{AtomicI32, AtomicI64, AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::Racy; #[cfg(any(target_os = "linux", target_os = "android"))] #[path = "linux.rs"] @@ -44,17 +43,23 @@ mod platform; /// A table of OS synchronization primitives for manually /// implementing futex functionality on unsupported platforms. -#[allow(unused)] +#[cfg(not(any( + target_os = "freebsd", + target_os = "macos", + target_os = "ios", + target_os = "watchos", + windows +)))] mod condvar_table; /// A type that supports atomic waits. -pub trait AtomicWait: private::AtomicWaitImpl { +pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// If the value is `value`, wait until woken up. /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait(&self, value: Self::AtomicInner) { - private::AtomicWaitImpl::wait_timeout(self, value, None); + fn wait(&self, value: Self::ECMAScriptAtomicInner) { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, None); } /// If the value is `value`, wait until timeout elapses @@ -62,94 +67,32 @@ pub trait AtomicWait: private::AtomicWaitImpl { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Duration) { - private::AtomicWaitImpl::wait_timeout(self, value, Some(timeout)); + fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Duration) { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, Some(timeout)); } /// Wake one thread that is waiting on this atomic. fn notify_one(&self) { - private::AtomicWaitImpl::notify_one(self); + private::ECMAScriptAtomicWaitImpl::notify_one(self); } /// Wake all threads that are waiting on this atomic. fn notify_all(&self) { - private::AtomicWaitImpl::notify_all(self); - } -} - -impl AtomicWait for AtomicU32 {} -impl AtomicWait for AtomicU64 {} -impl AtomicWait for AtomicI32 {} -impl AtomicWait for AtomicI64 {} - -impl private::AtomicWaitImpl for AtomicI32 { - type AtomicInner = i32; - - fn notify_all(&self) { - unsafe { - private::AtomicWaitImpl::notify_all(std::mem::transmute::<&AtomicI32, &AtomicU32>( - self, - )); - } - } - - fn notify_one(&self) { - unsafe { - private::AtomicWaitImpl::notify_one(std::mem::transmute::<&AtomicI32, &AtomicU32>( - self, - )); - } - } - - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { - private::AtomicWaitImpl::wait_timeout( - std::mem::transmute::<&AtomicI32, &AtomicU32>(self), - value as u32, - timeout, - ); - } + private::ECMAScriptAtomicWaitImpl::notify_all(self); } } -impl private::AtomicWaitImpl for AtomicI64 { - type AtomicInner = i64; - - fn notify_all(&self) { - unsafe { - private::AtomicWaitImpl::notify_all(std::mem::transmute::<&AtomicI64, &AtomicU64>( - self, - )); - } - } - - fn notify_one(&self) { - unsafe { - private::AtomicWaitImpl::notify_one(std::mem::transmute::<&AtomicI64, &AtomicU64>( - self, - )); - } - } - - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { - unsafe { - private::AtomicWaitImpl::wait_timeout( - std::mem::transmute::<&AtomicI64, &AtomicU64>(self), - value as u64, - timeout, - ); - } - } -} +impl ECMAScriptAtomicWait for Racy<'_, u32> {} +impl ECMAScriptAtomicWait for Racy<'_, u64> {} /// Private implementation details. mod private { - use std::time::Duration; + use core::time::Duration; /// A trait that cannot be implemented by other crates. - pub trait AtomicWaitImpl { + pub trait ECMAScriptAtomicWaitImpl { /// The underlying integer type for the atomic. - type AtomicInner; + type ECMAScriptAtomicInner; /// Wake all threads that are waiting on this atomic. fn notify_all(&self); @@ -161,6 +104,6 @@ mod private { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option); + fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option); } } diff --git a/src/linux.rs b/src/linux.rs index 86e4476..e0d80f7 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -1,14 +1,13 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64, Ordering}, - time::Duration, -}; +use core::time::Duration; -use crate::{condvar_table, private::AtomicWaitImpl}; +use ecmascript_atomics::{Ordering, Racy}; -impl AtomicWaitImpl for AtomicU32 { - type AtomicInner = u32; +use crate::{condvar_table, private::ECMAScriptAtomicWaitImpl}; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { + type ECMAScriptAtomicInner = u32; + + fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option) { unsafe { let wait_timespec = timeout.map(|x| libc::timespec { tv_sec: x.as_secs() as i64, @@ -17,7 +16,7 @@ impl AtomicWaitImpl for AtomicU32 { libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, value, wait_timespec @@ -32,7 +31,7 @@ impl AtomicWaitImpl for AtomicU32 { unsafe { libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, i32::MAX, ); @@ -43,7 +42,7 @@ impl AtomicWaitImpl for AtomicU32 { unsafe { libc::syscall( libc::SYS_futex, - self as *const _, + self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, 1i32, ); @@ -51,22 +50,22 @@ impl AtomicWaitImpl for AtomicU32 { } } -impl AtomicWaitImpl for AtomicU64 { - type AtomicInner = u64; +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { + type ECMAScriptAtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option) { condvar_table::wait( - self as *const _ as *const _, - || self.load(Ordering::Acquire) == value, + self.addr(), + || self.load(Ordering::SeqCst) == value, timeout, ); } fn notify_all(&self) { - condvar_table::notify_all(self as *const _ as *const _); + condvar_table::notify_all(self.addr()); } fn notify_one(&self) { - condvar_table::notify_one(self as *const _ as *const _); + condvar_table::notify_one(self.addr()); } } diff --git a/src/macos.rs b/src/macos.rs index b578649..71be7f3 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -1,18 +1,17 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::{Ordering, Racy}; use crate::private::AtomicWaitImpl; -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self as *const _ as *mut _, + self.addr(), value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -21,7 +20,7 @@ impl AtomicWaitImpl for AtomicU32 { ); } else { libc::os_sync_wait_on_address( - self as *const _ as *mut _, + self.addr(), value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -33,7 +32,7 @@ impl AtomicWaitImpl for AtomicU32 { fn notify_all(&self) { unsafe { libc::os_sync_wake_by_address_all( - self as *const _ as *mut _, + self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ); @@ -43,7 +42,7 @@ impl AtomicWaitImpl for AtomicU32 { fn notify_one(&self) { unsafe { libc::os_sync_wake_by_address_any( - self as *const _ as *mut _, + self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ); @@ -51,14 +50,14 @@ impl AtomicWaitImpl for AtomicU32 { } } -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self as *const _ as *mut _, + self.addr(), value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -67,7 +66,7 @@ impl AtomicWaitImpl for AtomicU64 { ); } else { libc::os_sync_wait_on_address( - self as *const _ as *mut _, + self.addr(), value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -79,7 +78,7 @@ impl AtomicWaitImpl for AtomicU64 { fn notify_all(&self) { unsafe { libc::os_sync_wake_by_address_all( - self as *const _ as *mut _, + self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ); @@ -89,7 +88,7 @@ impl AtomicWaitImpl for AtomicU64 { fn notify_one(&self) { unsafe { libc::os_sync_wake_by_address_any( - self as *const _ as *mut _, + self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ); diff --git a/src/wasm32.rs b/src/wasm32.rs index c08325e..3096cbc 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -1,8 +1,6 @@ -use std::{ - hint::spin_loop, - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use std::{hint::spin_loop, time::Duration}; + +use ecmascript_atomics::{Ordering, Racy}; use crate::private::AtomicWaitImpl; @@ -17,13 +15,13 @@ fn can_block() -> bool { } #[cfg(not(nightly))] -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { if can_block() { crate::condvar_table::wait( - self as *const _ as *const _, + self.addr(), || self.load(std::sync::atomic::Ordering::Acquire) == value, timeout, ); @@ -33,22 +31,22 @@ impl AtomicWaitImpl for AtomicU32 { } fn notify_all(&self) { - crate::condvar_table::notify_all(self as *const _ as *const _); + crate::condvar_table::notify_all(self.addr()); } fn notify_one(&self) { - crate::condvar_table::notify_one(self as *const _ as *const _); + crate::condvar_table::notify_one(self.addr()); } } #[cfg(not(nightly))] -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { if can_block() { crate::condvar_table::wait( - self as *const _ as *const _, + self.addr(), || self.load(std::sync::atomic::Ordering::Acquire) == value, timeout, ); @@ -58,23 +56,23 @@ impl AtomicWaitImpl for AtomicU64 { } fn notify_all(&self) { - crate::condvar_table::notify_all(self as *const _ as *const _); + crate::condvar_table::notify_all(self.addr()); } fn notify_one(&self) { - crate::condvar_table::notify_one(self as *const _ as *const _); + crate::condvar_table::notify_one(self.addr()); } } #[cfg(nightly)] -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { if can_block() { std::arch::wasm32::memory_atomic_wait32( - self as *const _ as *mut _, + self.addr(), value as i32, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) @@ -88,26 +86,26 @@ impl AtomicWaitImpl for AtomicU32 { fn notify_all(&self) { unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, u32::MAX); + std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); }; } fn notify_one(&self) { unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, 1); + std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; } } #[cfg(nightly)] -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { if can_block() { std::arch::wasm32::memory_atomic_wait64( - self as *const _ as *mut _, + self.addr(), value as i64, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) @@ -121,13 +119,13 @@ impl AtomicWaitImpl for AtomicU64 { fn notify_all(&self) { unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, u32::MAX); + std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); }; } fn notify_one(&self) { unsafe { - std::arch::wasm32::memory_atomic_notify(self as *const _ as *mut _, 1); + std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; } } diff --git a/src/windows.rs b/src/windows.rs index 352b54f..f355e70 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,20 +1,19 @@ -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use core::time::Duration; + +use ecmascript_atomics::{Ordering, Racy}; use windows_sys::Win32::System::Threading::{ INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle, }; use crate::private::AtomicWaitImpl; -impl AtomicWaitImpl for AtomicU32 { +impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { WaitOnAddress( - self as *const _ as *const _, + self.addr(), &value as *const _ as *const _, size_of::(), timeout @@ -31,21 +30,21 @@ impl AtomicWaitImpl for AtomicU32 { } fn notify_all(&self) { - unsafe { WakeByAddressAll(self as *const _ as *const _) }; + unsafe { WakeByAddressAll(self.addr()) }; } fn notify_one(&self) { - unsafe { WakeByAddressSingle(self as *const _ as *const _) }; + unsafe { WakeByAddressSingle(self.addr()) }; } } -impl AtomicWaitImpl for AtomicU64 { +impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { unsafe { WaitOnAddress( - self as *const _ as *const _, + self.addr(), &value as *const _ as *const _, size_of::(), timeout @@ -56,10 +55,10 @@ impl AtomicWaitImpl for AtomicU64 { } fn notify_all(&self) { - unsafe { WakeByAddressAll(self as *const _ as *const _) }; + unsafe { WakeByAddressAll(self.addr()) }; } fn notify_one(&self) { - unsafe { WakeByAddressSingle(self as *const _ as *const _) }; + unsafe { WakeByAddressSingle(self.addr()) }; } } diff --git a/tests/test.rs b/tests/test.rs index 66780e4..ca633a8 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,107 +1,134 @@ -use ecmascript_futex::AtomicWait; +use ecmascript_atomics::{Ordering, RacyMemory}; +use ecmascript_futex::ECMAScriptAtomicWait; use std::{ - sync::atomic::{AtomicU32, Ordering::Relaxed}, thread::sleep, time::{Duration, Instant}, }; #[test] fn wake_nothing() { - let a = AtomicU32::new(0); - a.notify_one(); - a.notify_all(); + let a = RacyMemory::new(0u32); + { + let a = a.as_slice().get(0).unwrap(); + a.notify_one(); + a.notify_all(); + } + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { a.exit_and_drop() }; } #[test] fn wait_unexpected() { let t = Instant::now(); - let a = AtomicU32::new(0); - a.wait(1); + let a = RacyMemory::new(0u32); + { + let a = a.as_slice().get(0).unwrap(); + a.wait(1); + } assert!(t.elapsed().as_millis() < 100); + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { a.exit_and_drop() }; } #[test] fn wait_wake() { let t = Instant::now(); - let a = AtomicU32::new(0); - std::thread::scope(|s| { - s.spawn(|| { - sleep(Duration::from_millis(100)); - a.store(1, Relaxed); - a.notify_one(); + let a = RacyMemory::new(0u32); + { + let a = a.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + s.spawn(|| { + sleep(Duration::from_millis(100)); + a.store(1, Ordering::Unordered); + a.notify_one(); + }); + while a.load(Ordering::Unordered) == 0 { + a.wait(0); + } + assert_eq!(a.load(Ordering::Unordered), 1); + assert!((90..400).contains(&t.elapsed().as_millis())); }); - while a.load(Relaxed) == 0 { - a.wait(0); - } - assert_eq!(a.load(Relaxed), 1); - assert!((90..400).contains(&t.elapsed().as_millis())); - }); + } + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { a.exit_and_drop() }; } #[test] fn wait_timeout() { - let a = AtomicU32::new(0); - a.wait_timeout(0, Duration::from_millis(1)); + let a = RacyMemory::new(0u32); + { + let a = a.as_slice().get(0).unwrap(); + a.wait_timeout(0, Duration::from_millis(1)); + } + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { a.exit_and_drop() }; } #[test] fn stress_many_waiters_notify_all() { - use std::sync::Arc; - let a = Arc::new(AtomicU32::new(0)); - let woke = Arc::new(AtomicU32::new(0)); + let a = RacyMemory::new(0u32); + let woke = RacyMemory::new(0u32); let threads = 64; - std::thread::scope(|s| { - for _ in 0..threads { - let a = a.clone(); - let woke = woke.clone(); - s.spawn(move || { - while a.load(Relaxed) == 0 { - a.wait(0); - } - woke.fetch_add(1, Relaxed); - }); - } - - // Give threads time to start waiting - sleep(Duration::from_millis(50)); - a.store(1, Relaxed); - a.notify_all(); - }); + { + let a = a.as_slice().get(0).unwrap(); + let woke = woke.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + for _ in 0..threads { + s.spawn(move || { + while a.load(Ordering::Unordered) == 0 { + a.wait(0); + } + woke.fetch_add(1); + }); + } - assert_eq!(woke.load(Relaxed), threads); + // Give threads time to start waiting + sleep(Duration::from_millis(50)); + a.store(1, Ordering::Unordered); + a.notify_all(); + }); + assert_eq!(woke.load(Ordering::Unordered), threads); + } + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { + a.exit_and_drop(); + woke.exit_and_drop(); + } } #[test] fn stress_ping_pong_many_iters() { - use std::sync::Arc; - let state = Arc::new(AtomicU32::new(0)); + let state = RacyMemory::new(0u32); let iters = 5_000u32; - std::thread::scope(|s| { - let state_c = state.clone(); - s.spawn(move || { - // Consumer: wait for 1, reset to 0, and notify producer. + { + let state = state.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + s.spawn(move || { + // Consumer: wait for 1, reset to 0, and notify producer. + for _ in 0..iters { + while state.load(Ordering::Unordered) != 1 { + // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. + state.wait_timeout(0, Duration::from_millis(10)); + } + state.store(0, Ordering::Unordered); + state.notify_one(); + } + }); + + // Producer: set to 1, notify consumer, then wait until it resets to 0. for _ in 0..iters { - while state_c.load(Relaxed) != 1 { - // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. - state_c.wait_timeout(0, Duration::from_millis(10)); + state.store(1, Ordering::Unordered); + state.notify_one(); + while state.load(Ordering::Unordered) != 0 { + state.wait_timeout(1, Duration::from_millis(10)); } - state_c.store(0, Relaxed); - state_c.notify_one(); } }); - - // Producer: set to 1, notify consumer, then wait until it resets to 0. - for _ in 0..iters { - state.store(1, Relaxed); - state.notify_one(); - while state.load(Relaxed) != 0 { - state.wait_timeout(1, Duration::from_millis(10)); - } - } - }); - - // Final state should be 0 after a complete ping-pong. - assert_eq!(state.load(Relaxed), 0); + // Final state should be 0 after a complete ping-pong. + assert_eq!(state.load(Ordering::Unordered), 0); + } + // SAFETY: we're the only referrer and block was created using `new`. + unsafe { state.exit_and_drop() }; } From 2b21c511daaf97dfcd8e70bb75637b7452dbbe7f Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Tue, 18 Nov 2025 10:29:08 +0200 Subject: [PATCH 03/32] fix: use RacyBox --- README.md | 9 +-- tests/test.rs | 157 +++++++++++++++++++++----------------------------- 2 files changed, 68 insertions(+), 98 deletions(-) diff --git a/README.md b/README.md index 3b82a31..3640118 100644 --- a/README.md +++ b/README.md @@ -27,11 +27,11 @@ Natively-supported platforms: ```rust use core::time::Duration; -use ecmascript_atomics::{Racy, RacyMemory}; +use ecmascript_atomics::{Racy, RacyBox}; use ecmascript_futex::ECMAScriptAtomicWait; -let a_owned = RacyMemory::new(0u64); -let a = a_owned.as_slice().get(0).unwrap(); +let a = RacyBox::new(0u64).unwrap(); +let a = a.as_slice().get(0).unwrap(); a.wait(1); // If the value is 1, wait. @@ -40,9 +40,6 @@ a.wait_timeout(2, Duration::from_millis(100)); // If the value is 2, wait at mo a.notify_one(); // Wake one waiting thread. a.notify_all(); // Wake all waiting threads. - -// SAFETY: a_owned is the only referrer -unsafe { a_owned.exit_and_drop() }; ``` ## Implementation diff --git a/tests/test.rs b/tests/test.rs index ca633a8..96eaeb7 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,4 +1,4 @@ -use ecmascript_atomics::{Ordering, RacyMemory}; +use ecmascript_atomics::{Ordering, RacyBox}; use ecmascript_futex::ECMAScriptAtomicWait; use std::{ thread::sleep, @@ -7,128 +7,101 @@ use std::{ #[test] fn wake_nothing() { - let a = RacyMemory::new(0u32); - { - let a = a.as_slice().get(0).unwrap(); - a.notify_one(); - a.notify_all(); - } - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { a.exit_and_drop() }; + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + a.notify_one(); + a.notify_all(); } #[test] fn wait_unexpected() { let t = Instant::now(); - let a = RacyMemory::new(0u32); - { - let a = a.as_slice().get(0).unwrap(); - a.wait(1); - } + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + a.wait(1); assert!(t.elapsed().as_millis() < 100); - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { a.exit_and_drop() }; } #[test] fn wait_wake() { let t = Instant::now(); - let a = RacyMemory::new(0u32); - { - let a = a.as_slice().get(0).unwrap(); - std::thread::scope(|s| { - s.spawn(|| { - sleep(Duration::from_millis(100)); - a.store(1, Ordering::Unordered); - a.notify_one(); - }); - while a.load(Ordering::Unordered) == 0 { - a.wait(0); - } - assert_eq!(a.load(Ordering::Unordered), 1); - assert!((90..400).contains(&t.elapsed().as_millis())); + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + s.spawn(|| { + sleep(Duration::from_millis(100)); + a.store(1, Ordering::Unordered); + a.notify_one(); }); - } - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { a.exit_and_drop() }; + while a.load(Ordering::Unordered) == 0 { + a.wait(0); + } + assert_eq!(a.load(Ordering::Unordered), 1); + assert!((90..400).contains(&t.elapsed().as_millis())); + }); } #[test] fn wait_timeout() { - let a = RacyMemory::new(0u32); - { - let a = a.as_slice().get(0).unwrap(); - a.wait_timeout(0, Duration::from_millis(1)); - } - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { a.exit_and_drop() }; + let a = RacyBox::new(0u32).unwrap(); + let a = a.as_slice().get(0).unwrap(); + a.wait_timeout(0, Duration::from_millis(1)); } #[test] fn stress_many_waiters_notify_all() { - let a = RacyMemory::new(0u32); - let woke = RacyMemory::new(0u32); + let a = RacyBox::new(0u32).unwrap(); + let woke = RacyBox::new(0u32).unwrap(); let threads = 64; - { - let a = a.as_slice().get(0).unwrap(); - let woke = woke.as_slice().get(0).unwrap(); - std::thread::scope(|s| { - for _ in 0..threads { - s.spawn(move || { - while a.load(Ordering::Unordered) == 0 { - a.wait(0); - } - woke.fetch_add(1); - }); - } + let a = a.as_slice().get(0).unwrap(); + let woke = woke.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + for _ in 0..threads { + s.spawn(move || { + while a.load(Ordering::Unordered) == 0 { + a.wait(0); + } + woke.fetch_add(1); + }); + } - // Give threads time to start waiting - sleep(Duration::from_millis(50)); - a.store(1, Ordering::Unordered); - a.notify_all(); - }); - assert_eq!(woke.load(Ordering::Unordered), threads); - } - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { - a.exit_and_drop(); - woke.exit_and_drop(); - } + // Give threads time to start waiting + sleep(Duration::from_millis(50)); + a.store(1, Ordering::Unordered); + a.notify_all(); + }); + assert_eq!(woke.load(Ordering::Unordered), threads); } #[test] fn stress_ping_pong_many_iters() { - let state = RacyMemory::new(0u32); + let state = RacyBox::new(0u32).unwrap(); let iters = 5_000u32; - { - let state = state.as_slice().get(0).unwrap(); - std::thread::scope(|s| { - s.spawn(move || { - // Consumer: wait for 1, reset to 0, and notify producer. - for _ in 0..iters { - while state.load(Ordering::Unordered) != 1 { - // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. - state.wait_timeout(0, Duration::from_millis(10)); - } - state.store(0, Ordering::Unordered); - state.notify_one(); - } - }); - - // Producer: set to 1, notify consumer, then wait until it resets to 0. + let state = state.as_slice().get(0).unwrap(); + std::thread::scope(|s| { + s.spawn(move || { + // Consumer: wait for 1, reset to 0, and notify producer. for _ in 0..iters { - state.store(1, Ordering::Unordered); - state.notify_one(); - while state.load(Ordering::Unordered) != 0 { - state.wait_timeout(1, Duration::from_millis(10)); + while state.load(Ordering::Unordered) != 1 { + // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. + state.wait_timeout(0, Duration::from_millis(10)); } + state.store(0, Ordering::Unordered); + state.notify_one(); } }); - // Final state should be 0 after a complete ping-pong. - assert_eq!(state.load(Ordering::Unordered), 0); - } - // SAFETY: we're the only referrer and block was created using `new`. - unsafe { state.exit_and_drop() }; + + // Producer: set to 1, notify consumer, then wait until it resets to 0. + for _ in 0..iters { + state.store(1, Ordering::Unordered); + state.notify_one(); + while state.load(Ordering::Unordered) != 0 { + state.wait_timeout(1, Duration::from_millis(10)); + } + } + }); + // Final state should be 0 after a complete ping-pong. + assert_eq!(state.load(Ordering::Unordered), 0); } From d06dd2907bfebd224bd1895e976a1eebd85c5899 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Tue, 18 Nov 2025 14:03:53 +0200 Subject: [PATCH 04/32] wrk --- Cargo.toml | 3 +++ README.md | 2 +- src/condvar_table.rs | 61 +++++++++++++++++++++++++++++--------------- src/fallback.rs | 24 +++++++++++------ src/freebsd.rs | 16 +++++++++--- src/lib.rs | 36 ++++++++++++++++++++------ src/linux.rs | 41 ++++++++++++++++++++++------- src/macos.rs | 16 +++++++++--- src/wasm32.rs | 40 ++++++++++++++++++++--------- src/windows.rs | 16 +++++++++--- tests/test.rs | 10 ++++---- 11 files changed, 190 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 18dae9e..6407256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,6 @@ web-sys = { version = "0.3.24", default-features = false, features = [ "Window" [build-dependencies] rustversion = { version = "1.0.14", default-features = false } + +[dev-dependencies] +ecmascript_atomics = { version = "0.2.2", path = "../ecmascript_atomics/ecmascript_atomics", features = ["alloc"] } diff --git a/README.md b/README.md index 3640118..a714d66 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ a.wait(1); // If the value is 1, wait. a.wait_timeout(2, Duration::from_millis(100)); // If the value is 2, wait at most 100 milliseconds -a.notify_one(); // Wake one waiting thread. +a.notify_many(1); // Wake one waiting thread. a.notify_all(); // Wake all waiting threads. ``` diff --git a/src/condvar_table.rs b/src/condvar_table.rs index 32e919c..c14b9d2 100644 --- a/src/condvar_table.rs +++ b/src/condvar_table.rs @@ -4,6 +4,8 @@ use std::{ time::Duration, }; +use crate::FutexError; + /// The number of OS synchronization primitives to use. const TABLE_SIZE: usize = 256; @@ -12,9 +14,14 @@ static TABLE: [TableEntry; TABLE_SIZE] = [TableEntry::DEFAULT; TABLE_SIZE]; /// Puts the current thread to sleep if `condition` evaluates to `true`. /// The thread will be woken after `timeout` if it is provided. -pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option) { +pub fn wait( + ptr: *const (), + condition: impl Fn() -> bool, + timeout: Option, +) -> Result<(), FutexError> { let entry = &TABLE[entry_for_ptr(ptr) as usize]; let mut guard = spin_lock(&entry.mutex); + let mut timedout = false; if condition() { if guard.waiting_count == 0 { guard.address = ptr; @@ -25,41 +32,55 @@ pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( self.addr(), || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } fn notify_all(&self) { condvar_table::notify_all(self.addr()); } - fn notify_one(&self) { - condvar_table::notify_one(self.addr()); + fn notify_many(&self, count: usize) { + condvar_table::notify_many(self.addr(), count); } } impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( self.addr(), || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } fn notify_all(&self) { condvar_table::notify_all(self.addr()); } - fn notify_one(&self) { - condvar_table::notify_one(self.addr()); + fn notify_many(&self, count: usize) { + condvar_table::notify_many(self.addr(), count); } } diff --git a/src/freebsd.rs b/src/freebsd.rs index 4d29584..ff71856 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -7,7 +7,11 @@ use crate::private::AtomicWaitImpl; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { @@ -50,7 +54,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { libc::_umtx_op( self.addr(), @@ -66,7 +70,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { @@ -109,7 +117,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { libc::_umtx_op( self.addr(), diff --git a/src/lib.rs b/src/lib.rs index 6aacf35..a6f9559 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,14 +52,24 @@ mod platform; )))] mod condvar_table; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FutexError { + /// Timeout fired. + Timeout, + /// The waiter was woken up spuriously. + Spurious, + /// An unknown error occurred. + Unknown, +} + /// A type that supports atomic waits. pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// If the value is `value`, wait until woken up. /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait(&self, value: Self::ECMAScriptAtomicInner) { - private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, None); + fn wait(&self, value: Self::ECMAScriptAtomicInner) -> Result<(), FutexError> { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, None) } /// If the value is `value`, wait until timeout elapses @@ -67,13 +77,17 @@ pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Duration) { - private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, Some(timeout)); + fn wait_timeout( + &self, + value: Self::ECMAScriptAtomicInner, + timeout: Duration, + ) -> Result<(), FutexError> { + private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, Some(timeout)) } /// Wake one thread that is waiting on this atomic. - fn notify_one(&self) { - private::ECMAScriptAtomicWaitImpl::notify_one(self); + fn notify_many(&self, count: usize) { + private::ECMAScriptAtomicWaitImpl::notify_many(self, count); } /// Wake all threads that are waiting on this atomic. @@ -89,6 +103,8 @@ impl ECMAScriptAtomicWait for Racy<'_, u64> {} mod private { use core::time::Duration; + use crate::FutexError; + /// A trait that cannot be implemented by other crates. pub trait ECMAScriptAtomicWaitImpl { /// The underlying integer type for the atomic. @@ -98,12 +114,16 @@ mod private { fn notify_all(&self); /// Wake one thread that is waiting on this atomic. - fn notify_one(&self); + fn notify_many(&self, count: usize); /// If the value is `value`, wait until woken up. /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option); + fn wait_timeout( + &self, + value: Self::ECMAScriptAtomicInner, + timeout: Option, + ) -> Result<(), FutexError>; } } diff --git a/src/linux.rs b/src/linux.rs index e0d80f7..c3084d7 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -2,19 +2,23 @@ use core::time::Duration; use ecmascript_atomics::{Ordering, Racy}; -use crate::{condvar_table, private::ECMAScriptAtomicWaitImpl}; +use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type ECMAScriptAtomicInner = u32; - fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::ECMAScriptAtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { let wait_timespec = timeout.map(|x| libc::timespec { tv_sec: x.as_secs() as i64, tv_nsec: x.subsec_nanos() as i64, }); - libc::syscall( + let result = libc::syscall( libc::SYS_futex, self.addr(), libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, @@ -24,6 +28,21 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { .map(|x| x as *const _) .unwrap_or(std::ptr::null()), ); + if result == 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::ETIMEDOUT { + println!("Timeout"); + Err(FutexError::Timeout) + } else if errno == libc::EAGAIN { + println!("Spurious?"); + Err(FutexError::Spurious) + } else { + println!("Unknown"); + Err(FutexError::Unknown) + } + } } } @@ -38,13 +57,13 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { libc::syscall( libc::SYS_futex, self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, - 1i32, + count.min(i32::MAX as usize) as i32, ); }; } @@ -53,19 +72,23 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type ECMAScriptAtomicInner = u64; - fn wait_timeout(&self, value: Self::ECMAScriptAtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::ECMAScriptAtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { condvar_table::wait( self.addr(), || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } fn notify_all(&self) { condvar_table::notify_all(self.addr()); } - fn notify_one(&self) { - condvar_table::notify_one(self.addr()); + fn notify_many(&self, count: usize) { + condvar_table::notify_many(self.addr(), count); } } diff --git a/src/macos.rs b/src/macos.rs index 71be7f3..922898d 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -7,7 +7,11 @@ use crate::private::AtomicWaitImpl; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( @@ -39,7 +43,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { libc::os_sync_wake_by_address_any( self.addr(), @@ -53,7 +57,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( @@ -85,7 +93,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { libc::os_sync_wake_by_address_any( self.addr(), diff --git a/src/wasm32.rs b/src/wasm32.rs index 3096cbc..cbf8257 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -18,11 +18,15 @@ fn can_block() -> bool { impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { if can_block() { crate::condvar_table::wait( self.addr(), - || self.load(std::sync::atomic::Ordering::Acquire) == value, + || self.load(Ordering::SeqCst) == value, timeout, ); } else { @@ -34,8 +38,8 @@ impl AtomicWaitImpl for Racy<'_, u32> { crate::condvar_table::notify_all(self.addr()); } - fn notify_one(&self) { - crate::condvar_table::notify_one(self.addr()); + fn notify_many(&self, count: usize) { + crate::condvar_table::notify_many(self.addr(), count); } } @@ -43,11 +47,15 @@ impl AtomicWaitImpl for Racy<'_, u32> { impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { if can_block() { crate::condvar_table::wait( self.addr(), - || self.load(std::sync::atomic::Ordering::Acquire) == value, + || self.load(Ordering::SeqCst) == value, timeout, ); } else { @@ -59,8 +67,8 @@ impl AtomicWaitImpl for Racy<'_, u64> { crate::condvar_table::notify_all(self.addr()); } - fn notify_one(&self) { - crate::condvar_table::notify_one(self.addr()); + fn notify_many(&self, count: usize) { + crate::condvar_table::notify_many(self.addr(), count); } } @@ -68,7 +76,11 @@ impl AtomicWaitImpl for Racy<'_, u64> { impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if can_block() { std::arch::wasm32::memory_atomic_wait32( @@ -90,7 +102,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; @@ -101,7 +113,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { if can_block() { std::arch::wasm32::memory_atomic_wait64( @@ -123,7 +139,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; diff --git a/src/windows.rs b/src/windows.rs index f355e70..78be526 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -10,7 +10,11 @@ use crate::private::AtomicWaitImpl; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { WaitOnAddress( self.addr(), @@ -33,7 +37,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { unsafe { WakeByAddressAll(self.addr()) }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { WakeByAddressSingle(self.addr()) }; } } @@ -41,7 +45,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { impl AtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; - fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option) { + fn wait_timeout( + &self, + value: Self::AtomicInner, + timeout: Option, + ) -> Result<(), FutexError> { unsafe { WaitOnAddress( self.addr(), @@ -58,7 +66,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { unsafe { WakeByAddressAll(self.addr()) }; } - fn notify_one(&self) { + fn notify_many(&self, count: usize) { unsafe { WakeByAddressSingle(self.addr()) }; } } diff --git a/tests/test.rs b/tests/test.rs index 96eaeb7..6d6b1bc 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -9,7 +9,7 @@ use std::{ fn wake_nothing() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - a.notify_one(); + a.notify_many(1); a.notify_all(); } @@ -18,7 +18,7 @@ fn wait_unexpected() { let t = Instant::now(); let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - a.wait(1); + assert_eq!(a.wait(1), Ok(())); assert!(t.elapsed().as_millis() < 100); } @@ -31,7 +31,7 @@ fn wait_wake() { s.spawn(|| { sleep(Duration::from_millis(100)); a.store(1, Ordering::Unordered); - a.notify_one(); + a.notify_many(1); }); while a.load(Ordering::Unordered) == 0 { a.wait(0); @@ -89,14 +89,14 @@ fn stress_ping_pong_many_iters() { state.wait_timeout(0, Duration::from_millis(10)); } state.store(0, Ordering::Unordered); - state.notify_one(); + state.notify_many(1); } }); // Producer: set to 1, notify consumer, then wait until it resets to 0. for _ in 0..iters { state.store(1, Ordering::Unordered); - state.notify_one(); + state.notify_many(1); while state.load(Ordering::Unordered) != 0 { state.wait_timeout(1, Duration::from_millis(10)); } From aecc35f36d5a23629dd3a3b2173e5b01123380b9 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:21:02 +0200 Subject: [PATCH 05/32] wrkwrkw --- src/condvar_table.rs | 34 ++++++++++++++++++---------------- src/fallback.rs | 8 ++++---- src/freebsd.rs | 8 ++++---- src/lib.rs | 16 ++++++++-------- src/linux.rs | 35 +++++++++++++++++++---------------- src/macos.rs | 24 +++++++++++++++++------- src/wasm32.rs | 40 ++++++++++++++++++++++++++++++---------- src/windows.rs | 8 ++++---- tests/test.rs | 29 ++++++++++++++++------------- 9 files changed, 120 insertions(+), 82 deletions(-) diff --git a/src/condvar_table.rs b/src/condvar_table.rs index c14b9d2..61c9b7c 100644 --- a/src/condvar_table.rs +++ b/src/condvar_table.rs @@ -43,45 +43,47 @@ pub fn wait( }; guard.waiting_count -= 1; + + if timedout { + Err(FutexError::Timeout) + } else { + Ok(()) + } } else { - return Ok(()); - } - if !timedout && !condition() { - Ok(()) - } else if timedout { - Err(FutexError::Timeout) - } else { - Err(FutexError::Spurious) + Err(FutexError::NotEqual) } } /// Wakes all threads waiting on `ptr`. -pub fn notify_all(ptr: *const ()) { +pub fn notify_all(ptr: *const ()) -> usize { if ptr.is_null() { - return; + return 0; } let entry = &TABLE[entry_for_ptr(ptr) as usize]; let metadata = *spin_lock(&entry.mutex); if 0 < metadata.waiting_count { entry.condvar.notify_all(); } + metadata.waiting_count } /// Wakes at least one thread waiting on `ptr`. -pub fn notify_many(ptr: *const (), count: usize) { +pub fn notify_many(ptr: *const (), count: usize) -> usize { if ptr.is_null() { - return; + return 0; } let entry = &TABLE[entry_for_ptr(ptr) as usize]; let metadata = *spin_lock(&entry.mutex); if metadata.waiting_count == 0 { - return; + return 0; } else if metadata.waiting_count < count || metadata.address.is_null() { entry.condvar.notify_all(); + metadata.waiting_count } else { for _ in 0..count { entry.condvar.notify_one(); } + count } } @@ -100,9 +102,9 @@ fn spin_lock(mutex: &Mutex) -> MutexGuard<'_, T> { /// Gets the entry index to use for the given address. fn entry_for_ptr(ptr: *const ()) -> u8 { let x_64 = ptr as u64; - let x_32 = (x_64 >> 32) as u32 | x_64 as u32; - let x_16 = (x_32 >> 16) as u16 | x_32 as u16; - (x_16 >> 8) as u8 | x_16 as u8 + let x_32 = (x_64 >> 32) as u32 ^ x_64 as u32; + let x_16 = (x_32 >> 16) as u16 ^ x_32 as u16; + (x_16 >> 8) as u8 ^ (x_16 >> 2) as u8 } /// Holds metadata that gets written while locking. diff --git a/src/fallback.rs b/src/fallback.rs index 845eab8..7f43623 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -19,11 +19,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { ) } - fn notify_all(&self) { + fn notify_all(&self) -> usize { condvar_table::notify_all(self.addr()); } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { condvar_table::notify_many(self.addr(), count); } } @@ -43,11 +43,11 @@ impl AtomicWaitImpl for Racy<'_, u64> { ) } - fn notify_all(&self) { + fn notify_all(&self) -> usize { condvar_table::notify_all(self.addr()); } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { condvar_table::notify_many(self.addr(), count); } } diff --git a/src/freebsd.rs b/src/freebsd.rs index ff71856..d9cb9fe 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -42,7 +42,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::_umtx_op( self.addr(), @@ -54,7 +54,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::_umtx_op( self.addr(), @@ -105,7 +105,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::_umtx_op( self.addr(), @@ -117,7 +117,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::_umtx_op( self.addr(), diff --git a/src/lib.rs b/src/lib.rs index a6f9559..d637c36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,10 +54,10 @@ mod condvar_table; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FutexError { + /// The value was not equal and no sleep was performed. + NotEqual, /// Timeout fired. Timeout, - /// The waiter was woken up spuriously. - Spurious, /// An unknown error occurred. Unknown, } @@ -86,13 +86,13 @@ pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { } /// Wake one thread that is waiting on this atomic. - fn notify_many(&self, count: usize) { - private::ECMAScriptAtomicWaitImpl::notify_many(self, count); + fn notify_many(&self, count: usize) -> usize { + private::ECMAScriptAtomicWaitImpl::notify_many(self, count) } /// Wake all threads that are waiting on this atomic. - fn notify_all(&self) { - private::ECMAScriptAtomicWaitImpl::notify_all(self); + fn notify_all(&self) -> usize { + private::ECMAScriptAtomicWaitImpl::notify_all(self) } } @@ -111,10 +111,10 @@ mod private { type ECMAScriptAtomicInner; /// Wake all threads that are waiting on this atomic. - fn notify_all(&self); + fn notify_all(&self) -> usize; /// Wake one thread that is waiting on this atomic. - fn notify_many(&self, count: usize); + fn notify_many(&self, count: usize) -> usize; /// If the value is `value`, wait until woken up. /// diff --git a/src/linux.rs b/src/linux.rs index c3084d7..cfcdd65 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -32,40 +32,43 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); - if errno == libc::ETIMEDOUT { - println!("Timeout"); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { Err(FutexError::Timeout) - } else if errno == libc::EAGAIN { - println!("Spurious?"); - Err(FutexError::Spurious) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) } else { - println!("Unknown"); Err(FutexError::Unknown) } } } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::syscall( libc::SYS_futex, self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, i32::MAX, - ); - }; + ) + .unsigned_abs() as usize + } } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::syscall( libc::SYS_futex, self.addr(), libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, count.min(i32::MAX as usize) as i32, - ); - }; + ) + .unsigned_abs() as usize + } } } @@ -84,11 +87,11 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { ) } - fn notify_all(&self) { - condvar_table::notify_all(self.addr()); + fn notify_all(&self) -> usize { + condvar_table::notify_all(self.addr()) } - fn notify_many(&self, count: usize) { - condvar_table::notify_many(self.addr(), count); + fn notify_many(&self, count: usize) -> usize { + condvar_table::notify_many(self.addr(), count) } } diff --git a/src/macos.rs b/src/macos.rs index 922898d..1e33179 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -12,7 +12,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( self.addr(), @@ -21,19 +21,29 @@ impl AtomicWaitImpl for Racy<'_, u32> { libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, libc::CLOCK_MONOTONIC, time.as_nanos().min(u64::MAX as u128) as u64, - ); + ) } else { libc::os_sync_wait_on_address( self.addr(), value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - ); + ) + } + }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) } } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::os_sync_wake_by_address_all( self.addr(), @@ -43,7 +53,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::os_sync_wake_by_address_any( self.addr(), @@ -83,7 +93,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { libc::os_sync_wake_by_address_all( self.addr(), @@ -93,7 +103,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { libc::os_sync_wake_by_address_any( self.addr(), diff --git a/src/wasm32.rs b/src/wasm32.rs index cbf8257..4f65289 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -34,11 +34,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { crate::condvar_table::notify_all(self.addr()); } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { crate::condvar_table::notify_many(self.addr(), count); } } @@ -63,11 +63,11 @@ impl AtomicWaitImpl for Racy<'_, u64> { } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { crate::condvar_table::notify_all(self.addr()); } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { crate::condvar_table::notify_many(self.addr(), count); } } @@ -83,26 +83,36 @@ impl AtomicWaitImpl for Racy<'_, u32> { ) -> Result<(), FutexError> { unsafe { if can_block() { - std::arch::wasm32::memory_atomic_wait32( + let result = std::arch::wasm32::memory_atomic_wait32( self.addr(), value as i32, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) .unwrap_or(i64::MAX), ); + if result == 0 { + Ok(()) + } else if result == 1 { + Err(FutexError::NotEqual) + } else if result == 2 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } else { spin_loop(); + Ok(()) } } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; @@ -120,26 +130,36 @@ impl AtomicWaitImpl for Racy<'_, u64> { ) -> Result<(), FutexError> { unsafe { if can_block() { - std::arch::wasm32::memory_atomic_wait64( + let result = std::arch::wasm32::memory_atomic_wait64( self.addr(), value as i64, timeout .map(|x| x.as_nanos().min(i64::MAX as u128) as i64) .unwrap_or(i64::MAX), ); + if result == 0 { + Ok(()) + } else if result == 1 { + Err(FutexError::NotEqual) + } else if result == 2 { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } else { spin_loop(); + Ok(()) } } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), 1); }; diff --git a/src/windows.rs b/src/windows.rs index 78be526..bd08e9a 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -33,11 +33,11 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { WakeByAddressAll(self.addr()) }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { WakeByAddressSingle(self.addr()) }; } } @@ -62,11 +62,11 @@ impl AtomicWaitImpl for Racy<'_, u64> { } } - fn notify_all(&self) { + fn notify_all(&self) -> usize { unsafe { WakeByAddressAll(self.addr()) }; } - fn notify_many(&self, count: usize) { + fn notify_many(&self, count: usize) -> usize { unsafe { WakeByAddressSingle(self.addr()) }; } } diff --git a/tests/test.rs b/tests/test.rs index 6d6b1bc..e27afd4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,5 +1,5 @@ use ecmascript_atomics::{Ordering, RacyBox}; -use ecmascript_futex::ECMAScriptAtomicWait; +use ecmascript_futex::{ECMAScriptAtomicWait, FutexError}; use std::{ thread::sleep, time::{Duration, Instant}, @@ -9,8 +9,8 @@ use std::{ fn wake_nothing() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - a.notify_many(1); - a.notify_all(); + assert_eq!(a.notify_many(1), 0); + assert_eq!(a.notify_all(), 0); } #[test] @@ -18,7 +18,7 @@ fn wait_unexpected() { let t = Instant::now(); let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - assert_eq!(a.wait(1), Ok(())); + assert_eq!(a.wait(1), Err(FutexError::NotEqual)); assert!(t.elapsed().as_millis() < 100); } @@ -31,10 +31,10 @@ fn wait_wake() { s.spawn(|| { sleep(Duration::from_millis(100)); a.store(1, Ordering::Unordered); - a.notify_many(1); + assert_eq!(a.notify_many(1), 1); }); while a.load(Ordering::Unordered) == 0 { - a.wait(0); + assert!(a.wait(0).is_ok()); } assert_eq!(a.load(Ordering::Unordered), 1); assert!((90..400).contains(&t.elapsed().as_millis())); @@ -45,7 +45,10 @@ fn wait_wake() { fn wait_timeout() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - a.wait_timeout(0, Duration::from_millis(1)); + assert_eq!( + a.wait_timeout(0, Duration::from_millis(1)), + Err(FutexError::Timeout) + ); } #[test] @@ -60,7 +63,7 @@ fn stress_many_waiters_notify_all() { for _ in 0..threads { s.spawn(move || { while a.load(Ordering::Unordered) == 0 { - a.wait(0); + assert!(a.wait(0).is_ok()); } woke.fetch_add(1); }); @@ -69,7 +72,7 @@ fn stress_many_waiters_notify_all() { // Give threads time to start waiting sleep(Duration::from_millis(50)); a.store(1, Ordering::Unordered); - a.notify_all(); + assert_eq!(a.notify_all(), threads as usize); }); assert_eq!(woke.load(Ordering::Unordered), threads); } @@ -86,19 +89,19 @@ fn stress_ping_pong_many_iters() { for _ in 0..iters { while state.load(Ordering::Unordered) != 1 { // Wait while the state is 0; use a short timeout to be resilient to spurious wakes. - state.wait_timeout(0, Duration::from_millis(10)); + let _ = state.wait_timeout(0, Duration::from_millis(10)).is_ok(); } state.store(0, Ordering::Unordered); - state.notify_many(1); + assert!(state.notify_many(1) <= 1); } }); // Producer: set to 1, notify consumer, then wait until it resets to 0. for _ in 0..iters { state.store(1, Ordering::Unordered); - state.notify_many(1); + assert!(state.notify_many(1) <= 1); while state.load(Ordering::Unordered) != 0 { - state.wait_timeout(1, Duration::from_millis(10)); + let _ = state.wait_timeout(1, Duration::from_millis(10)); } } }); From ac9b52b7ad70d8e26110f3ada0b87ec1937d0bd3 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:35:55 +0200 Subject: [PATCH 06/32] ecmascript_atomics v0.2.3 --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6407256..a0a41ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ license = "BSD-2-Clause" categories = ["concurrency", "os", "no-std"] [dependencies] -ecmascript_atomics = { version = "0.2.2", path = "../ecmascript_atomics/ecmascript_atomics" } +ecmascript_atomics = { version = "0.2.3" } [target.'cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "macos"))'.dependencies] libc = { version = "0.2", default-features = false } @@ -26,4 +26,4 @@ web-sys = { version = "0.3.24", default-features = false, features = [ "Window" rustversion = { version = "1.0.14", default-features = false } [dev-dependencies] -ecmascript_atomics = { version = "0.2.2", path = "../ecmascript_atomics/ecmascript_atomics", features = ["alloc"] } +ecmascript_atomics = { version = "0.2.3", features = ["alloc"] } From 97ec97ba51cabe51693d4b1cc833aa3712d4a471 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:36:29 +0200 Subject: [PATCH 07/32] rust.yml fix --- .github/workflows/rust.yml | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c8e4800..6bc372c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,18 +1,19 @@ name: Rust on: - push: { branches: "main" } - pull_request: { branches: "*" } + push: { branches: ["main"] } + pull_request: { branches: ["*"] } jobs: build-and-test: strategy: matrix: - os: [ - ubuntu-latest, - ubuntu-22.04-arm, - windows-latest, - windows-11-arm, - macos-latest, - ] + os: + [ + ubuntu-latest, + ubuntu-22.04-arm, + windows-latest, + windows-11-arm, + macos-latest, + ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 From 95547452afece1c275b33de59e5f86b213e9af3b Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:37:27 +0200 Subject: [PATCH 08/32] clippy --- src/condvar_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/condvar_table.rs b/src/condvar_table.rs index 61c9b7c..42edc04 100644 --- a/src/condvar_table.rs +++ b/src/condvar_table.rs @@ -75,7 +75,7 @@ pub fn notify_many(ptr: *const (), count: usize) -> usize { let entry = &TABLE[entry_for_ptr(ptr) as usize]; let metadata = *spin_lock(&entry.mutex); if metadata.waiting_count == 0 { - return 0; + 0 } else if metadata.waiting_count < count || metadata.address.is_null() { entry.condvar.notify_all(); metadata.waiting_count From 722c7946d9adefe12253d4559f421e51579c29fd Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:39:40 +0200 Subject: [PATCH 09/32] use FutexError --- src/fallback.rs | 2 +- src/freebsd.rs | 2 +- src/macos.rs | 2 +- src/wasm32.rs | 2 +- src/windows.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/fallback.rs b/src/fallback.rs index 7f43623..fb8e48f 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -2,7 +2,7 @@ use core::time::Duration; use ecmascript_atomics::{Ordering, Racy}; -use crate::{condvar_table, private::AtomicWaitImpl}; +use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; diff --git a/src/freebsd.rs b/src/freebsd.rs index d9cb9fe..fd22f80 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -2,7 +2,7 @@ use core::time::Duration; use ecmascript_atomics::{Ordering, Racy}; -use crate::private::AtomicWaitImpl; +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; diff --git a/src/macos.rs b/src/macos.rs index 1e33179..50ac2c4 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -2,7 +2,7 @@ use core::time::Duration; use ecmascript_atomics::{Ordering, Racy}; -use crate::private::AtomicWaitImpl; +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; diff --git a/src/wasm32.rs b/src/wasm32.rs index 4f65289..9fa5fc5 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -2,7 +2,7 @@ use std::{hint::spin_loop, time::Duration}; use ecmascript_atomics::{Ordering, Racy}; -use crate::private::AtomicWaitImpl; +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; /// Whether this thread is allowed to block and use synchronization primitives. #[inline(always)] diff --git a/src/windows.rs b/src/windows.rs index bd08e9a..6659ecf 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -5,7 +5,7 @@ use windows_sys::Win32::System::Threading::{ INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle, }; -use crate::private::AtomicWaitImpl; +use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; impl AtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; From d782cc7112f6ea8db03bed22287d2d6e6b26db4b Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 10:41:42 +0200 Subject: [PATCH 10/32] ECMAScriptAtomicWaitImpl --- src/fallback.rs | 4 ++-- src/freebsd.rs | 4 ++-- src/macos.rs | 4 ++-- src/wasm32.rs | 8 ++++---- src/windows.rs | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/fallback.rs b/src/fallback.rs index fb8e48f..74eefc6 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -4,7 +4,7 @@ use ecmascript_atomics::{Ordering, Racy}; use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -28,7 +28,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( diff --git a/src/freebsd.rs b/src/freebsd.rs index fd22f80..1d854f1 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -4,7 +4,7 @@ use ecmascript_atomics::{Ordering, Racy}; use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -67,7 +67,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( diff --git a/src/macos.rs b/src/macos.rs index 50ac2c4..0429ef1 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -4,7 +4,7 @@ use ecmascript_atomics::{Ordering, Racy}; use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -64,7 +64,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( diff --git a/src/wasm32.rs b/src/wasm32.rs index 9fa5fc5..3878cb5 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -15,7 +15,7 @@ fn can_block() -> bool { } #[cfg(not(nightly))] -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -44,7 +44,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } #[cfg(not(nightly))] -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( @@ -73,7 +73,7 @@ impl AtomicWaitImpl for Racy<'_, u64> { } #[cfg(nightly)] -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -120,7 +120,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } #[cfg(nightly)] -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( diff --git a/src/windows.rs b/src/windows.rs index 6659ecf..f1be083 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -7,7 +7,7 @@ use windows_sys::Win32::System::Threading::{ use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; -impl AtomicWaitImpl for Racy<'_, u32> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { type AtomicInner = u32; fn wait_timeout( @@ -42,7 +42,7 @@ impl AtomicWaitImpl for Racy<'_, u32> { } } -impl AtomicWaitImpl for Racy<'_, u64> { +impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { type AtomicInner = u64; fn wait_timeout( From ab008207e42ae92b49f26c971cedca449e8eb6ec Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 11:11:39 +0200 Subject: [PATCH 11/32] fixes --- src/fallback.rs | 8 ++--- src/freebsd.rs | 90 ++++++++++++++++++++++++++++++++++++++++--------- src/lib.rs | 12 +++---- src/linux.rs | 8 ++--- src/macos.rs | 64 ++++++++++++++++++++++++++++------- src/wasm32.rs | 32 ++++++++---------- src/windows.rs | 47 +++++++++++++++++++++----- 7 files changed, 191 insertions(+), 70 deletions(-) diff --git a/src/fallback.rs b/src/fallback.rs index 74eefc6..a71dee2 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -20,11 +20,11 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } fn notify_all(&self) -> usize { - condvar_table::notify_all(self.addr()); + condvar_table::notify_all(self.addr()) } fn notify_many(&self, count: usize) -> usize { - condvar_table::notify_many(self.addr(), count); + condvar_table::notify_many(self.addr(), count) } } @@ -44,10 +44,10 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { } fn notify_all(&self) -> usize { - condvar_table::notify_all(self.addr()); + condvar_table::notify_all(self.addr()) } fn notify_many(&self, count: usize) -> usize { - condvar_table::notify_many(self.addr(), count); + condvar_table::notify_many(self.addr(), count) } } diff --git a/src/freebsd.rs b/src/freebsd.rs index 1d854f1..84a5e8e 100644 --- a/src/freebsd.rs +++ b/src/freebsd.rs @@ -1,6 +1,6 @@ use core::time::Duration; -use ecmascript_atomics::{Ordering, Racy}; +use ecmascript_atomics::Racy; use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; @@ -12,7 +12,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { _clockid: libc::CLOCK_MONOTONIC as u32, @@ -29,7 +29,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { value as u64, size_of::() as *mut _, &wait_timespec as *const _ as *mut _, - ); + ) } else { libc::_umtx_op( self.addr(), @@ -37,33 +37,63 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { value as u64, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) } }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) + } else { + Err(FutexError::Unknown) + } + } } fn notify_all(&self) -> usize { - unsafe { + let result = unsafe { libc::_umtx_op( self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + result as usize + } else if result == 0 { + usize::MAX + } else { + 0 + } } fn notify_many(&self, count: usize) -> usize { - unsafe { + let result = unsafe { libc::_umtx_op( self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + (result as usize).min(count) + } else if result == 0 { + count + } else { + 0 + } } } @@ -75,7 +105,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { if let Some(time) = timeout { let wait_timespec = libc::_umtx_time { _clockid: libc::CLOCK_MONOTONIC as u32, @@ -92,7 +122,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { value, size_of::() as *mut _, &wait_timespec as *const _ as *mut _, - ); + ) } else { libc::_umtx_op( self.addr(), @@ -100,32 +130,62 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { value, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) } }; + if result >= 0 { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::EAGAIN { + Err(FutexError::NotEqual) + } else if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else if errno == libc::EINTR { + // We consider spurious interrupts to still be valid + // wakeups. + Ok(()) + } else { + Err(FutexError::Unknown) + } + } } fn notify_all(&self) -> usize { - unsafe { + let result = unsafe { libc::_umtx_op( self.addr(), libc::UMTX_OP_WAKE_PRIVATE, i32::MAX as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + result as usize + } else if result == 0 { + usize::MAX + } else { + 0 + } } fn notify_many(&self, count: usize) -> usize { - unsafe { + let result = unsafe { libc::_umtx_op( self.addr(), libc::UMTX_OP_WAKE_PRIVATE, 1 as libc::c_ulong, std::ptr::null_mut(), std::ptr::null_mut(), - ); + ) }; + if result > 0 { + (result as usize).min(count) + } else if result == 0 { + count + } else { + 0 + } } } diff --git a/src/lib.rs b/src/lib.rs index d637c36..c900087 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,7 +68,7 @@ pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait(&self, value: Self::ECMAScriptAtomicInner) -> Result<(), FutexError> { + fn wait(&self, value: Self::AtomicInner) -> Result<(), FutexError> { private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, None) } @@ -77,11 +77,7 @@ pub trait ECMAScriptAtomicWait: private::ECMAScriptAtomicWaitImpl { /// /// This function might also return spuriously, /// without a corresponding wake operation. - fn wait_timeout( - &self, - value: Self::ECMAScriptAtomicInner, - timeout: Duration, - ) -> Result<(), FutexError> { + fn wait_timeout(&self, value: Self::AtomicInner, timeout: Duration) -> Result<(), FutexError> { private::ECMAScriptAtomicWaitImpl::wait_timeout(self, value, Some(timeout)) } @@ -108,7 +104,7 @@ mod private { /// A trait that cannot be implemented by other crates. pub trait ECMAScriptAtomicWaitImpl { /// The underlying integer type for the atomic. - type ECMAScriptAtomicInner; + type AtomicInner; /// Wake all threads that are waiting on this atomic. fn notify_all(&self) -> usize; @@ -122,7 +118,7 @@ mod private { /// without a corresponding wake operation. fn wait_timeout( &self, - value: Self::ECMAScriptAtomicInner, + value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError>; } diff --git a/src/linux.rs b/src/linux.rs index cfcdd65..cbc64a0 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -5,11 +5,11 @@ use ecmascript_atomics::{Ordering, Racy}; use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl}; impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { - type ECMAScriptAtomicInner = u32; + type AtomicInner = u32; fn wait_timeout( &self, - value: Self::ECMAScriptAtomicInner, + value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { unsafe { @@ -73,11 +73,11 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { - type ECMAScriptAtomicInner = u64; + type AtomicInner = u64; fn wait_timeout( &self, - value: Self::ECMAScriptAtomicInner, + value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { condvar_table::wait( diff --git a/src/macos.rs b/src/macos.rs index 0429ef1..8d61639 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -1,6 +1,7 @@ use core::time::Duration; +use std::usize; -use ecmascript_atomics::{Ordering, Racy}; +use ecmascript_atomics::Racy; use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; @@ -44,23 +45,37 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } fn notify_all(&self) -> usize { - unsafe { + let result = unsafe { libc::os_sync_wake_by_address_all( self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up + usize::MAX + } else { + // No threads were woken up. + 0 + } } fn notify_many(&self, count: usize) -> usize { - unsafe { + let result = unsafe { libc::os_sync_wake_by_address_any( self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up; assume count. + count + } else { + // No threads were woken up. + 0 + } } } @@ -72,7 +87,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( self.addr(), @@ -81,35 +96,60 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, libc::CLOCK_MONOTONIC, time.as_nanos().min(u64::MAX as u128) as u64, - ); + ) } else { libc::os_sync_wait_on_address( self.addr(), value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - ); + ) + } + }; + if result >= 0 { + // Result indicates how many waiters remain. + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == libc::ETIMEDOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) } } } fn notify_all(&self) -> usize { - unsafe { + let result = unsafe { libc::os_sync_wake_by_address_all( self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up + usize::MAX + } else { + // No threads were woken up. + 0 + } } fn notify_many(&self, count: usize) -> usize { - unsafe { + let result = unsafe { libc::os_sync_wake_by_address_any( self.addr(), size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ); + ) }; + if result == 0 { + // At least one thread was woken up; assume count. + count + } else { + // No threads were woken up. + 0 + } } } diff --git a/src/wasm32.rs b/src/wasm32.rs index 3878cb5..0a09c6d 100644 --- a/src/wasm32.rs +++ b/src/wasm32.rs @@ -28,18 +28,19 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { self.addr(), || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } else { spin_loop(); + Ok(()) } } fn notify_all(&self) -> usize { - crate::condvar_table::notify_all(self.addr()); + crate::condvar_table::notify_all(self.addr()) } fn notify_many(&self, count: usize) -> usize { - crate::condvar_table::notify_many(self.addr(), count); + crate::condvar_table::notify_many(self.addr(), count) } } @@ -57,18 +58,19 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { self.addr(), || self.load(Ordering::SeqCst) == value, timeout, - ); + ) } else { spin_loop(); + Ok(()) } } fn notify_all(&self) -> usize { - crate::condvar_table::notify_all(self.addr()); + crate::condvar_table::notify_all(self.addr()) } fn notify_many(&self, count: usize) -> usize { - crate::condvar_table::notify_many(self.addr(), count); + crate::condvar_table::notify_many(self.addr(), count) } } @@ -107,15 +109,12 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } fn notify_all(&self) -> usize { - unsafe { - std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); - }; + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX) as usize } } fn notify_many(&self, count: usize) -> usize { - unsafe { - std::arch::wasm32::memory_atomic_notify(self.addr(), 1); - }; + let count = u32::try_from(count).unwrap_or(u32::MAX); + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), count) as usize } } } @@ -154,14 +153,11 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { } fn notify_all(&self) -> usize { - unsafe { - std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX); - }; + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), u32::MAX) as usize } } fn notify_many(&self, count: usize) -> usize { - unsafe { - std::arch::wasm32::memory_atomic_notify(self.addr(), 1); - }; + let count = u32::try_from(count).unwrap_or(u32::MAX); + unsafe { std::arch::wasm32::memory_atomic_notify(self.addr(), count) as usize } } } diff --git a/src/windows.rs b/src/windows.rs index f1be083..00a6072 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,8 +1,9 @@ use core::time::Duration; -use ecmascript_atomics::{Ordering, Racy}; -use windows_sys::Win32::System::Threading::{ - INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle, +use ecmascript_atomics::Racy; +use windows_sys::Win32::{ + Foundation::ERROR_TIMEOUT, + System::Threading::{INFINITE, WaitOnAddress, WakeByAddressAll, WakeByAddressSingle}, }; use crate::{FutexError, private::ECMAScriptAtomicWaitImpl}; @@ -15,7 +16,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { WaitOnAddress( self.addr(), &value as *const _ as *const _, @@ -29,16 +30,30 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { capped as u32 }) .unwrap_or(INFINITE), - ); + ) + }; + if result { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == ERROR_TIMEOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } } fn notify_all(&self) -> usize { unsafe { WakeByAddressAll(self.addr()) }; + usize::MAX } fn notify_many(&self, count: usize) -> usize { - unsafe { WakeByAddressSingle(self.addr()) }; + for _ in 0..count { + unsafe { WakeByAddressSingle(self.addr()) }; + } + count } } @@ -50,7 +65,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { value: Self::AtomicInner, timeout: Option, ) -> Result<(), FutexError> { - unsafe { + let result = unsafe { WaitOnAddress( self.addr(), &value as *const _ as *const _, @@ -58,15 +73,29 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) .unwrap_or(INFINITE), - ); + ) + }; + if result { + Ok(()) + } else { + let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + if errno == ERROR_TIMEOUT { + Err(FutexError::Timeout) + } else { + Err(FutexError::Unknown) + } } } fn notify_all(&self) -> usize { unsafe { WakeByAddressAll(self.addr()) }; + usize::MAX } fn notify_many(&self, count: usize) -> usize { - unsafe { WakeByAddressSingle(self.addr()) }; + for _ in 0..count { + unsafe { WakeByAddressSingle(self.addr()) }; + } + count } } From 683c6d934797ede3dc68e9befce5da7a1533edb9 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 11:14:45 +0200 Subject: [PATCH 12/32] fix: macos addr type --- src/macos.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 8d61639..e746813 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -16,7 +16,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self.addr(), + self.addr() as *mut _ as *mut _, value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -25,7 +25,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { ) } else { libc::os_sync_wait_on_address( - self.addr(), + self.addr() as *mut _ as *mut _, value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -47,7 +47,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { fn notify_all(&self) -> usize { let result = unsafe { libc::os_sync_wake_by_address_all( - self.addr(), + self.addr() as *mut _ as *mut _, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -64,7 +64,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { fn notify_many(&self, count: usize) -> usize { let result = unsafe { libc::os_sync_wake_by_address_any( - self.addr(), + self.addr() as *mut _ as *mut _, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -90,7 +90,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self.addr(), + self.addr() as *mut _ as *mut _, value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -99,7 +99,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { ) } else { libc::os_sync_wait_on_address( - self.addr(), + self.addr() as *mut _ as *mut _, value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -122,7 +122,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { fn notify_all(&self) -> usize { let result = unsafe { libc::os_sync_wake_by_address_all( - self.addr(), + self.addr() as *mut _ as *mut _, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -139,7 +139,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { fn notify_many(&self, count: usize) -> usize { let result = unsafe { libc::os_sync_wake_by_address_any( - self.addr(), + self.addr() as *mut _ as *mut _, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) From 83ce48ea81c0799df64cd7f095ae757e31296678 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 11:37:48 +0200 Subject: [PATCH 13/32] fix --- src/macos.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index e746813..860e6c7 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -16,7 +16,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -25,7 +25,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { ) } else { libc::os_sync_wait_on_address( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, value as u64, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -47,7 +47,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { fn notify_all(&self) -> usize { let result = unsafe { libc::os_sync_wake_by_address_all( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -64,7 +64,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { fn notify_many(&self, count: usize) -> usize { let result = unsafe { libc::os_sync_wake_by_address_any( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -90,7 +90,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { if let Some(time) = timeout { libc::os_sync_wait_on_address_with_timeout( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -99,7 +99,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { ) } else { libc::os_sync_wait_on_address( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, value, size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, @@ -122,7 +122,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { fn notify_all(&self) -> usize { let result = unsafe { libc::os_sync_wake_by_address_all( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) @@ -139,7 +139,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { fn notify_many(&self, count: usize) -> usize { let result = unsafe { libc::os_sync_wake_by_address_any( - self.addr() as *mut _ as *mut _, + self.addr() as *mut libc::c_void, size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) From b04e7d2476a50efe5fa206e905db149f4fa12fd8 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 11:59:32 +0200 Subject: [PATCH 14/32] std::usize --- src/macos.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/macos.rs b/src/macos.rs index 860e6c7..289c271 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -1,5 +1,4 @@ use core::time::Duration; -use std::usize; use ecmascript_atomics::Racy; From e3ac02d1fa03501ba82660af76548eb24740560f Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:38:52 +0200 Subject: [PATCH 15/32] fix --- src/windows.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 00a6072..4f9a7d8 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -18,8 +18,8 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { ) -> Result<(), FutexError> { let result = unsafe { WaitOnAddress( - self.addr(), - &value as *const _ as *const _, + self.addr() as *const core::ffi::c_void, + &value as *const core::ffi::c_void, size_of::(), timeout .map(|x| { @@ -36,7 +36,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); - if errno == ERROR_TIMEOUT { + if errno == ERROR_TIMEOUT as i32 { Err(FutexError::Timeout) } else { Err(FutexError::Unknown) @@ -67,19 +67,19 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { ) -> Result<(), FutexError> { let result = unsafe { WaitOnAddress( - self.addr(), - &value as *const _ as *const _, + self.addr() as *const core::ffi::c_void, + &value as *const core::ffi::c_void, size_of::(), timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) .unwrap_or(INFINITE), ) }; - if result { + if result == 1 { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); - if errno == ERROR_TIMEOUT { + if errno == ERROR_TIMEOUT as i32 { Err(FutexError::Timeout) } else { Err(FutexError::Unknown) From e3f575325e3086608a2214f86f09108fce82055a Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:39:26 +0200 Subject: [PATCH 16/32] fix --- src/windows.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 4f9a7d8..2372b75 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -45,13 +45,13 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } fn notify_all(&self) -> usize { - unsafe { WakeByAddressAll(self.addr()) }; + unsafe { WakeByAddressAll(self.addr() as *const core::ffi::c_void) }; usize::MAX } fn notify_many(&self, count: usize) -> usize { for _ in 0..count { - unsafe { WakeByAddressSingle(self.addr()) }; + unsafe { WakeByAddressSingle(self.addr() as *const core::ffi::c_void) }; } count } @@ -88,13 +88,13 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { } fn notify_all(&self) -> usize { - unsafe { WakeByAddressAll(self.addr()) }; + unsafe { WakeByAddressAll(self.addr() as *const core::ffi::c_void) }; usize::MAX } fn notify_many(&self, count: usize) -> usize { for _ in 0..count { - unsafe { WakeByAddressSingle(self.addr()) }; + unsafe { WakeByAddressSingle(self.addr() as *const core::ffi::c_void) }; } count } From d0fbff7102e5176fdd63b004973f4d9711b9dfe8 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:43:08 +0200 Subject: [PATCH 17/32] fix --- tests/test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test.rs b/tests/test.rs index e27afd4..3844df6 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -72,7 +72,7 @@ fn stress_many_waiters_notify_all() { // Give threads time to start waiting sleep(Duration::from_millis(50)); a.store(1, Ordering::Unordered); - assert_eq!(a.notify_all(), threads as usize); + assert!(a.notify_all() >= threads as usize); }); assert_eq!(woke.load(Ordering::Unordered), threads); } From 16de49fb11d9295d99d01b55d74dd6224191c56e Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:47:43 +0200 Subject: [PATCH 18/32] fix --- src/macos.rs | 1 + src/windows.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 289c271..a824317 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -32,6 +32,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } }; if result >= 0 { + eprintln!("{result:?}"); Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); diff --git a/src/windows.rs b/src/windows.rs index 2372b75..56706c6 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -19,7 +19,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { let result = unsafe { WaitOnAddress( self.addr() as *const core::ffi::c_void, - &value as *const core::ffi::c_void, + &value as *const u32 as *const core::ffi::c_void, size_of::(), timeout .map(|x| { @@ -32,7 +32,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { .unwrap_or(INFINITE), ) }; - if result { + if result == 1 { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); @@ -68,7 +68,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { WaitOnAddress( self.addr() as *const core::ffi::c_void, - &value as *const core::ffi::c_void, + &value as *const u32 as *const core::ffi::c_void, size_of::(), timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) From df1e548b57e08406fdb520cac14413229d9c82e2 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:49:43 +0200 Subject: [PATCH 19/32] fix --- src/windows.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 56706c6..ff3a4e1 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -68,7 +68,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { WaitOnAddress( self.addr() as *const core::ffi::c_void, - &value as *const u32 as *const core::ffi::c_void, + &value as *const u64 as *const core::ffi::c_void, size_of::(), timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) From 19e3ea884c1a7d77dac69a0182ba473b246bf248 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:54:29 +0200 Subject: [PATCH 20/32] fix --- src/macos.rs | 2 +- src/windows.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/macos.rs b/src/macos.rs index a824317..69abed4 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -32,7 +32,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } }; if result >= 0 { - eprintln!("{result:?}"); + eprintln!("Result: {result:?}"); Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); diff --git a/src/windows.rs b/src/windows.rs index ff3a4e1..7695d38 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -33,6 +33,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { ) }; if result == 1 { + eprintln!("Result: {result}"); Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); From a6b2fd21fd0939325ba9449e8a4c3a2f747e55f9 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 12:58:49 +0200 Subject: [PATCH 21/32] fix --- src/windows.rs | 1 - tests/test.rs | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 7695d38..ff3a4e1 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -33,7 +33,6 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { ) }; if result == 1 { - eprintln!("Result: {result}"); Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); diff --git a/tests/test.rs b/tests/test.rs index 3844df6..02c9f30 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -18,7 +18,11 @@ fn wait_unexpected() { let t = Instant::now(); let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); + // Note: Windows doesn't report early-exits. + #[cfg(not(windows))] assert_eq!(a.wait(1), Err(FutexError::NotEqual)); + #[cfg(windows)] + assert_eq!(a.wait(1), Ok(())); assert!(t.elapsed().as_millis() < 100); } From 2c14f065374ac4be644156edfb93b359f904415d Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:00:12 +0200 Subject: [PATCH 22/32] macos test --- src/macos.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/macos.rs b/src/macos.rs index 69abed4..4c78568 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -36,6 +36,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + eprintln!("Errno: {errno:?}"); if errno == libc::ETIMEDOUT { Err(FutexError::Timeout) } else { From 89254797db39844e909fe297bb5adb7b91308a86 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:03:45 +0200 Subject: [PATCH 23/32] windows error log --- src/windows.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/windows.rs b/src/windows.rs index ff3a4e1..29aa466 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -36,6 +36,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); + eprintln!("Errno: {errno:?}"); if errno == ERROR_TIMEOUT as i32 { Err(FutexError::Timeout) } else { From 6b23e11b68b88a94ee2a4185766ab3039cffd725 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:08:04 +0200 Subject: [PATCH 24/32] Ensure no zero timeouts on macos --- src/macos.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 4c78568..400fff8 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -20,7 +20,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, libc::CLOCK_MONOTONIC, - time.as_nanos().min(u64::MAX as u128) as u64, + (time.as_nanos().min(u64::MAX as u128) as u64).max(1), ) } else { libc::os_sync_wait_on_address( @@ -96,7 +96,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, libc::CLOCK_MONOTONIC, - time.as_nanos().min(u64::MAX as u128) as u64, + (time.as_nanos().min(u64::MAX as u128) as u64).max(1), ) } else { libc::os_sync_wait_on_address( From 59aa8b6db96aff6a596131407e2ef5c604675175 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:17:52 +0200 Subject: [PATCH 25/32] wait_timeout can spuriously wake --- tests/test.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test.rs b/tests/test.rs index 02c9f30..c00bc6c 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -49,10 +49,11 @@ fn wait_wake() { fn wait_timeout() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - assert_eq!( + // Note: it's possible for us to receive a spurious wake. + assert!(matches!( a.wait_timeout(0, Duration::from_millis(1)), - Err(FutexError::Timeout) - ); + Ok(()) | Err(FutexError::Timeout) + )); } #[test] From 48f335494073a25a66db9f7f87630b9227ddcaeb Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:29:33 +0200 Subject: [PATCH 26/32] size fix --- src/macos.rs | 20 ++++++++++---------- src/windows.rs | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 400fff8..57754f1 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -17,16 +17,16 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { libc::os_sync_wait_on_address_with_timeout( self.addr() as *mut libc::c_void, value as u64, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - libc::CLOCK_MONOTONIC, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, (time.as_nanos().min(u64::MAX as u128) as u64).max(1), ) } else { libc::os_sync_wait_on_address( self.addr() as *mut libc::c_void, value as u64, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, ) } @@ -49,7 +49,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { let result = unsafe { libc::os_sync_wake_by_address_all( self.addr() as *mut libc::c_void, - size_of::(), + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) }; @@ -66,7 +66,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { let result = unsafe { libc::os_sync_wake_by_address_any( self.addr() as *mut libc::c_void, - size_of::(), + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) }; @@ -93,16 +93,16 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { libc::os_sync_wait_on_address_with_timeout( self.addr() as *mut libc::c_void, value, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, - libc::CLOCK_MONOTONIC, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, (time.as_nanos().min(u64::MAX as u128) as u64).max(1), ) } else { libc::os_sync_wait_on_address( self.addr() as *mut libc::c_void, value, - size_of::(), + size_of::(), libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, ) } @@ -124,7 +124,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { libc::os_sync_wake_by_address_all( self.addr() as *mut libc::c_void, - size_of::(), + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) }; @@ -141,7 +141,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { let result = unsafe { libc::os_sync_wake_by_address_any( self.addr() as *mut libc::c_void, - size_of::(), + size_of::(), libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, ) }; diff --git a/src/windows.rs b/src/windows.rs index 29aa466..c58ce1b 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -20,7 +20,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { WaitOnAddress( self.addr() as *const core::ffi::c_void, &value as *const u32 as *const core::ffi::c_void, - size_of::(), + size_of::(), timeout .map(|x| { // Clamp to a finite u32 millisecond timeout. INFINITE (0xFFFFFFFF) @@ -70,7 +70,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { WaitOnAddress( self.addr() as *const core::ffi::c_void, &value as *const u64 as *const core::ffi::c_void, - size_of::(), + size_of::(), timeout .map(|x| x.as_millis().min(u32::MAX as u128 - 1) as u32) .unwrap_or(INFINITE), From 2a5d8989c16158fbcd8ca9218d940f5cfb7ea9c3 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:34:36 +0200 Subject: [PATCH 27/32] fix --- tests/test.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test.rs b/tests/test.rs index c00bc6c..c4b595a 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -18,10 +18,10 @@ fn wait_unexpected() { let t = Instant::now(); let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - // Note: Windows doesn't report early-exits. - #[cfg(not(windows))] + // Note: Windows and iOS doesn't report early-exits. + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "watchos", windows)))] assert_eq!(a.wait(1), Err(FutexError::NotEqual)); - #[cfg(windows)] + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "watchos", windows))] assert_eq!(a.wait(1), Ok(())); assert!(t.elapsed().as_millis() < 100); } From 27153d6f202f8941c10a3138740b5c7cd79b7a46 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:38:04 +0200 Subject: [PATCH 28/32] fix --- tests/test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test.rs b/tests/test.rs index c4b595a..72f87c5 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -9,8 +9,8 @@ use std::{ fn wake_nothing() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - assert_eq!(a.notify_many(1), 0); - assert_eq!(a.notify_all(), 0); + assert!(a.notify_many(1) >= 1); + a.notify_all(); } #[test] From 5e549df2be09c0f49e1a77469f2e63c48ac8669e Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:39:23 +0200 Subject: [PATCH 29/32] fix --- tests/test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test.rs b/tests/test.rs index 72f87c5..36c97bc 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -9,7 +9,7 @@ use std::{ fn wake_nothing() { let a = RacyBox::new(0u32).unwrap(); let a = a.as_slice().get(0).unwrap(); - assert!(a.notify_many(1) >= 1); + assert!(a.notify_many(1) <= 1); a.notify_all(); } From a034e604b444605335ca0311f75892c4b5eae72c Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:43:11 +0200 Subject: [PATCH 30/32] fix --- src/macos.rs | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 57754f1..8423bc2 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -63,19 +63,18 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } fn notify_many(&self, count: usize) -> usize { - let result = unsafe { - libc::os_sync_wake_by_address_any( - self.addr() as *mut libc::c_void, - size_of::(), - libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ) - }; - if result == 0 { - // At least one thread was woken up; assume count. - count - } else { - // No threads were woken up. - 0 + for i in 0..count { + let result = unsafe { + libc::os_sync_wake_by_address_any( + self.addr() as *mut libc::c_void, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ) + }; + if result != 0 { + // No threads were woken up. + return i; + } } } } @@ -138,19 +137,18 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { } fn notify_many(&self, count: usize) -> usize { - let result = unsafe { - libc::os_sync_wake_by_address_any( - self.addr() as *mut libc::c_void, - size_of::(), - libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, - ) - }; - if result == 0 { - // At least one thread was woken up; assume count. - count - } else { - // No threads were woken up. - 0 + for i in 0..count { + let result = unsafe { + libc::os_sync_wake_by_address_any( + self.addr() as *mut libc::c_void, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ) + }; + if result != 0 { + // No threads were woken up. + return i; + } } } } From 317a2dd4291ad6a008940f3e1ad95adba6a11f71 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:44:28 +0200 Subject: [PATCH 31/32] fix --- src/macos.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/macos.rs b/src/macos.rs index 8423bc2..1e3c118 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -76,6 +76,7 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { return i; } } + count } } @@ -150,5 +151,6 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> { return i; } } + count } } From 94308c4921751257fab969ceaeb5d6704a8535d2 Mon Sep 17 00:00:00 2001 From: Aapo Alasuutari Date: Thu, 20 Nov 2025 13:51:18 +0200 Subject: [PATCH 32/32] fix --- src/macos.rs | 2 -- src/windows.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/src/macos.rs b/src/macos.rs index 1e3c118..d3ab007 100644 --- a/src/macos.rs +++ b/src/macos.rs @@ -32,11 +32,9 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { } }; if result >= 0 { - eprintln!("Result: {result:?}"); Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); - eprintln!("Errno: {errno:?}"); if errno == libc::ETIMEDOUT { Err(FutexError::Timeout) } else { diff --git a/src/windows.rs b/src/windows.rs index c58ce1b..003d6a1 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -36,7 +36,6 @@ impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> { Ok(()) } else { let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0); - eprintln!("Errno: {errno:?}"); if errno == ERROR_TIMEOUT as i32 { Err(FutexError::Timeout) } else {