Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
name: Rust
on:
push: { branches: "main" }
pull_request: { branches: "*" }
push: { branches: ["main"] }
pull_request: { branches: ["*"] }
jobs:
build-and-test:
strategy:
matrix:
os: [ubuntu-latest, ubuntu-22.04-arm, windows-latest, windows-11-arm, macos-latest]
os:
[
ubuntu-latest,
ubuntu-22.04-arm,
windows-latest,
windows-11-arm,
macos-latest,
]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
Expand All @@ -26,4 +33,3 @@ jobs:
run: cargo build --verbose
- name: Run tests
run: cargo nextest run --verbose

16 changes: 11 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
[package]
name = "wait_on_address"
description = "Cross-platform atomic wait and wake (aka futex) functionality."
repository = "https://github.com/DouglasDwyer/wait_on_address"
keywords = ["atomic", "futex"]
name = "ecmascript_futex"
description = "Cross-platform atomic wait and wake (aka futex) functionality using the ECMAScript Atomics memory model."
repository = "https://github.com/trynova/ecmascript_futex"
keywords = ["atomic", "futex", "ecmascript"]
version = "0.1.1"
edition = "2024"
license = "BSD-2-Clause"
categories = ["concurrency", "os", "no-std"]

[dependencies]
ecmascript_atomics = { version = "0.2.3" }

[target.'cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "macos"))'.dependencies]
libc = { version = "0.2", default-features = false }

Expand All @@ -20,4 +23,7 @@ wasm-bindgen = { version = "0.2.90", default-features = false }
web-sys = { version = "0.3.24", default-features = false, features = [ "Window" ] }

[build-dependencies]
rustversion = { version = "1.0.14", default-features = false }
rustversion = { version = "1.0.14", default-features = false }

[dev-dependencies]
ecmascript_atomics = { version = "0.2.3", features = ["alloc"] }
36 changes: 23 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# wait_on_address
# ecmascript_futex

[![Crates.io](https://img.shields.io/crates/v/wait_on_address.svg)](https://crates.io/crates/wait_on_address)
[![Docs.rs](https://docs.rs/wait_on_address/badge.svg)](https://docs.rs/wait_on_address)
Cross platform library for implementing ECMAScript `Atomics.wait`,
`Atomics.wakeAsync`, and `Atomics.notify` (aka futex) functionality in Rust,
operating on ECMAScript memory as produced by the
[`ecmascript_atomics`](https://github.com/trynova/ecmascript_atomics) crate.
This crate is a fork of
[`wait_on_address`](https://github.com/DouglasDwyer/wait_on_address) which is
itself a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait). The
changes inherited and kept from `wait_on_address` are:

Cross platform atomic wait and wake (aka futex) functionality. This crate is a fork of [`atomic-wait`](https://github.com/m-ou-se/atomic-wait), and extends the original code with the following functionality:

- Support for `AtomicI32`, `AtomicI64`, and `AtomicU64`
- Support for waiting with a timeout
- Support for `wasm32` on nightly using `std::arch`
- Polyfill for all other platforms

The main

Natively-supported platforms:

- Windows 8+, Windows Server 2012+
Expand All @@ -21,16 +26,18 @@ Natively-supported platforms:
## Usage

```rust
use std::{sync::atomic::AtomicU64, time::Duration};
use wait_on_address::AtomicWait;
use core::time::Duration;
use ecmascript_atomics::{Racy, RacyBox};
use ecmascript_futex::ECMAScriptAtomicWait;

let a = AtomicU64::new(0);
let a = RacyBox::new(0u64).unwrap();
let a = a.as_slice().get(0).unwrap();

a.wait(1); // If the value is 1, wait.

a.wait_timeout(2, Duration::from_millis(100)); // If the value is 2, wait at most 100 milliseconds

a.notify_one(); // Wake one waiting thread.
a.notify_many(1); // Wake one waiting thread.

a.notify_all(); // Wake all waiting threads.
```
Expand All @@ -43,8 +50,11 @@ On FreeBSD, this uses the `_umtx_op` syscall.

On Windows, this uses the `WaitOnAddress` and `WakeByAddress` APIs.

On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and `os_sync_wake_by_address` APIs.
On macOS (and iOS and watchOS), this uses the `os_sync_wait_on_address` and
`os_sync_wake_by_address` APIs.

On wasm32 with `nightly`, this uses `memory_atomic_wait32`, `memory_atomic_wait64`, and `memory_atomic_notify` instructions.
On wasm32 with `nightly`, this uses `memory_atomic_wait32`,
`memory_atomic_wait64`, and `memory_atomic_notify` instructions.

All other platforms with `std` support fall back to a fixed-size hashmap of `Condvar`s, similar to `libstdc++`'s implementation for `std::atomic<T>`.
All other platforms with `std` support fall back to a fixed-size hashmap of
`Condvar`s, similar to `libstdc++`'s implementation for `std::atomic<T>`.
3 changes: 3 additions & 0 deletions rust_toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = ["rustfmt", "clippy"]
71 changes: 47 additions & 24 deletions src/condvar_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
time::Duration,
};

use crate::FutexError;

/// The number of OS synchronization primitives to use.
const TABLE_SIZE: usize = 256;

Expand All @@ -12,9 +14,14 @@ static TABLE: [TableEntry; TABLE_SIZE] = [TableEntry::DEFAULT; TABLE_SIZE];

/// Puts the current thread to sleep if `condition` evaluates to `true`.
/// The thread will be woken after `timeout` if it is provided.
pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option<Duration>) {
pub fn wait(
ptr: *const (),
condition: impl Fn() -> bool,
timeout: Option<Duration>,
) -> Result<(), FutexError> {
let entry = &TABLE[entry_for_ptr(ptr) as usize];
let mut guard = spin_lock(&entry.mutex);
let mut timedout = false;
if condition() {
if guard.waiting_count == 0 {
guard.address = ptr;
Expand All @@ -25,42 +32,58 @@ pub fn wait(ptr: *const (), condition: impl FnOnce() -> bool, timeout: Option<Du
guard.waiting_count += 1;

guard = if let Some(time) = timeout {
entry
let (guard, result) = entry
.condvar
.wait_timeout(guard, time)
.expect("Failed to lock mutex")
.0
.expect("Failed to lock mutex");
timedout = result.timed_out();
guard
} else {
entry.condvar.wait(guard).expect("Failed to lock mutex")
};

guard.waiting_count -= 1;

if timedout {
Err(FutexError::Timeout)
} else {
Ok(())
}
} else {
Err(FutexError::NotEqual)
}
}

/// Wakes all threads waiting on `ptr`.
pub fn notify_all(ptr: *const ()) {
if !ptr.is_null() {
let entry = &TABLE[entry_for_ptr(ptr) as usize];
let metadata = *spin_lock(&entry.mutex);
if 0 < metadata.waiting_count {
entry.condvar.notify_all();
}
pub fn notify_all(ptr: *const ()) -> usize {
if ptr.is_null() {
return 0;
}
let entry = &TABLE[entry_for_ptr(ptr) as usize];
let metadata = *spin_lock(&entry.mutex);
if 0 < metadata.waiting_count {
entry.condvar.notify_all();
}
metadata.waiting_count
}

/// Wakes at least one thread waiting on `ptr`.
pub fn notify_one(ptr: *const ()) {
if !ptr.is_null() {
let entry = &TABLE[entry_for_ptr(ptr) as usize];
let metadata = *spin_lock(&entry.mutex);
if 0 < metadata.waiting_count {
if metadata.address.is_null() {
entry.condvar.notify_all();
} else if metadata.address == ptr {
entry.condvar.notify_one();
}
pub fn notify_many(ptr: *const (), count: usize) -> usize {
if ptr.is_null() {
return 0;
}
let entry = &TABLE[entry_for_ptr(ptr) as usize];
let metadata = *spin_lock(&entry.mutex);
if metadata.waiting_count == 0 {
0
} else if metadata.waiting_count < count || metadata.address.is_null() {
entry.condvar.notify_all();
metadata.waiting_count
} else {
for _ in 0..count {
entry.condvar.notify_one();
}
count
}
}

Expand All @@ -79,9 +102,9 @@ fn spin_lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
/// Gets the entry index to use for the given address.
fn entry_for_ptr(ptr: *const ()) -> u8 {
let x_64 = ptr as u64;
let x_32 = (x_64 >> 32) as u32 | x_64 as u32;
let x_16 = (x_32 >> 16) as u16 | x_32 as u16;
(x_16 >> 8) as u8 | x_16 as u8
let x_32 = (x_64 >> 32) as u32 ^ x_64 as u32;
let x_16 = (x_32 >> 16) as u16 ^ x_32 as u16;
(x_16 >> 8) as u8 ^ (x_16 >> 2) as u8
}

/// Holds metadata that gets written while locking.
Expand Down
53 changes: 30 additions & 23 deletions src/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,53 @@
use std::{
sync::atomic::{AtomicU32, AtomicU64, Ordering},
time::Duration,
};
use core::time::Duration;

use crate::{condvar_table, private::AtomicWaitImpl};
use ecmascript_atomics::{Ordering, Racy};

impl AtomicWaitImpl for AtomicU32 {
use crate::{FutexError, condvar_table, private::ECMAScriptAtomicWaitImpl};

impl ECMAScriptAtomicWaitImpl for Racy<'_, u32> {
type AtomicInner = u32;

fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option<Duration>) {
fn wait_timeout(
&self,
value: Self::AtomicInner,
timeout: Option<Duration>,
) -> Result<(), FutexError> {
condvar_table::wait(
self as *const _ as *const _,
|| self.load(Ordering::Acquire) == value,
self.addr(),
|| self.load(Ordering::SeqCst) == value,
timeout,
);
)
}

fn notify_all(&self) {
condvar_table::notify_all(self as *const _ as *const _);
fn notify_all(&self) -> usize {
condvar_table::notify_all(self.addr())
}

fn notify_one(&self) {
condvar_table::notify_one(self as *const _ as *const _);
fn notify_many(&self, count: usize) -> usize {
condvar_table::notify_many(self.addr(), count)
}
}

impl AtomicWaitImpl for AtomicU64 {
impl ECMAScriptAtomicWaitImpl for Racy<'_, u64> {
type AtomicInner = u64;

fn wait_timeout(&self, value: Self::AtomicInner, timeout: Option<Duration>) {
fn wait_timeout(
&self,
value: Self::AtomicInner,
timeout: Option<Duration>,
) -> Result<(), FutexError> {
condvar_table::wait(
self as *const _ as *const _,
|| self.load(Ordering::Acquire) == value,
self.addr(),
|| self.load(Ordering::SeqCst) == value,
timeout,
);
)
}

fn notify_all(&self) {
condvar_table::notify_all(self as *const _ as *const _);
fn notify_all(&self) -> usize {
condvar_table::notify_all(self.addr())
}

fn notify_one(&self) {
condvar_table::notify_one(self as *const _ as *const _);
fn notify_many(&self, count: usize) -> usize {
condvar_table::notify_many(self.addr(), count)
}
}
Loading