Skip to content

Commit

Permalink
Semaphore: remove outstanding permit tracking
Browse files Browse the repository at this point in the history
bounded: remove unbounded send functionality
speed up semaphore
bump version to 0.3.0
upgrade deps
  • Loading branch information
Oliver Giersch committed Mar 12, 2024
1 parent 63a96b3 commit af1b996
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 249 deletions.
12 changes: 12 additions & 0 deletions BENCHMARK.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,16 @@ test uncontented_unbounded_sync ... bench: 204,268 ns/iter (+/- 17,505)
test uncontented_unbounded_unsync ... bench: 17,078 ns/iter (+/- 874)
test result: ok. 0 passed; 0 failed; 0 ignored; 4 measured; 0 filtered out; finished in 16.17s
```

Version `0.3.0`:

```
running 4 tests
test uncontented_bounded_sync ... bench: 340,491 ns/iter (+/- 16,329)
test uncontented_bounded_unsync ... bench: 51,252 ns/iter (+/- 1,413)
test uncontented_unbounded_sync ... bench: 215,017 ns/iter (+/- 11,349)
test uncontented_unbounded_unsync ... bench: 17,753 ns/iter (+/- 847)
test result: ok. 0 passed; 0 failed; 0 ignored; 4 measured; 0 filtered out; finished in 5.71s
```
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-unsync"
description = "asynchronous, unsynchronized (thread-local) channels and semaphores"
version = "0.2.3"
version = "0.3.0"
authors = ["Oliver Giersch"]
license = "MIT/Apache-2.0"
repository = "https://github.com/oliver-giersch/async-unsync.git"
Expand All @@ -19,9 +19,9 @@ alloc = []
bench = ["tokio"]

[dev-dependencies]
futures-lite = "1.12.0"
futures-lite = "2.2.0"

[dependencies.tokio]
version = "1.27.0"
version = "1.36.0"
features = ["rt", "rt-multi-thread", "sync"]
optional = true
10 changes: 10 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@

- Fixes a bug where a `Waker` registered by a `Semaphore` would not be dropped,
causing memory leaks.

## Release `0.3.0`

- Performance improvements to `Semaphore`.

### Breaking Changes

- Removes `bound::[Channel|Sender|SenderRef]::unbounded_send`
- Removes `Semaphore::outstanding_permits` and `Semaphore::return_permits`
- Alters the behaviour of `bounded::Channel::from_iter` to use the maximum of the iterator's length and the given capacity as the channel's capacity.
157 changes: 12 additions & 145 deletions src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ impl<T> Channel<T> {
Self { queue: BoundedQueue::with_capacity(capacity, initial) }
}

/// Returns a new bounded channel with pre-queued elements.
/// Returns a new bounded channel with a given capacity and pre-queued
/// elements.
///
/// The initial capacity will be the difference between `capacity` and the
/// number of elements returned by the [`Iterator`].
/// The iterator may return more than `capacity` elements, but the channel's
/// capacity will never exceed the given `capacity`.
/// The total channel capacity will be the maximum of `capacity` and the
/// iterator's length.
///
/// # Panics
///
Expand Down Expand Up @@ -213,29 +214,6 @@ impl<T> Channel<T> {
self.queue.try_send::<UNCOUNTED>(elem)
}

/// Sends a value through the channel, ignoring any capacity constraints.
///
/// This will immediately enqueue `elem`, even if there are currently
/// senders waiting due to a lack of available capacity.
/// Care must be taken with unbounded sends, as they may undermine
/// assumptions about message ordering and the ability to apply
/// backpressure.
/// Alternatively, this can be thought of as a one-time capacity increase.
///
/// # Errors
///
/// Fails, if the queue is closed.
pub fn unbounded_send(&self, elem: T) -> Result<(), SendError<T>> {
const CAPACITY_REDUCING: bool = true;

if self.is_closed() {
return Err(SendError(elem));
}

self.queue.unbounded_send::<CAPACITY_REDUCING>(elem);
Ok(())
}

/// Sends a value, potentially waiting until there is capacity.
///
/// # Errors
Expand Down Expand Up @@ -379,29 +357,6 @@ impl<T> Sender<T> {
self.queue.try_send::<COUNTED>(elem)
}

/// Sends a value through the channel, ignoring any capacity constraints.
///
/// This will immediately enqueue `elem`, even if there are currently
/// senders waiting due to a lack of available capacity.
/// Care must be taken with unbounded sends, as they may undermine
/// assumptions about message ordering and the ability to apply
/// backpressure.
/// Alternatively, this can be thought of as a one-time capacity increase.
///
/// # Errors
///
/// Fails, if the queue is closed.
pub fn unbounded_send(&self, elem: T) -> Result<(), SendError<T>> {
const CAPACITY_REDUCING: bool = true;

if self.is_closed() {
return Err(SendError(elem));
}

self.queue.unbounded_send::<CAPACITY_REDUCING>(elem);
Ok(())
}

/// Sends a value through the channel, potentially waiting until there is
/// sufficient capacity.
///
Expand Down Expand Up @@ -644,29 +599,6 @@ impl<T> SenderRef<'_, T> {
self.queue.try_send::<COUNTED>(elem)
}

/// Sends a value through the channel, ignoring any capacity constraints.
///
/// This will immediately enqueue `elem`, even if there are currently
/// senders waiting due to a lack of available capacity.
/// Care must be taken with unbounded sends, as they may undermine
/// assumptions about message ordering and the ability to apply
/// backpressure.
/// Alternatively, this can be thought of as a one-time capacity increase.
///
/// # Errors
///
/// Fails, if the queue is closed.
pub fn unbounded_send(&self, elem: T) -> Result<(), SendError<T>> {
const CAPACITY_REDUCING: bool = true;

if self.is_closed() {
return Err(SendError(elem));
}

self.queue.unbounded_send::<CAPACITY_REDUCING>(elem);
Ok(())
}

/// Sends a value through the channel, potentially blocking until there is
/// sufficient capacity.
///
Expand Down Expand Up @@ -961,17 +893,15 @@ impl<T> Permit<'_, T> {
/// immediately and the permit is consumed.
/// This will succeed even if the channel has been closed.
pub fn send(self, elem: T) {
// must not reduce capacity again (done during reservation)
const CAPACITY_REDUCING: bool = false;

self.queue.unbounded_send::<CAPACITY_REDUCING>(elem);
self.queue.unbounded_send(elem);
self.queue.unreserve(true);
mem::forget(self);
}
}

impl<T> Drop for Permit<'_, T> {
fn drop(&mut self) {
self.queue.unreserve();
self.queue.unreserve(false);
}
}

Expand All @@ -996,19 +926,17 @@ impl<T> OwnedPermit<T> {
/// Unlike [`Permit::send`], this method returns the [`Sender`] from which
/// the [`OwnedPermit`] was reserved.
pub fn send(mut self, elem: T) -> Sender<T> {
// must not reduce capacity again (done during reservation)
const CAPACITY_REDUCING: bool = false;

let sender = self.sender.take().unwrap_or_else(|| unreachable!());
sender.queue.unbounded_send::<CAPACITY_REDUCING>(elem);
sender.queue.unbounded_send(elem);
sender.queue.unreserve(true);
sender
}
}

impl<T> Drop for OwnedPermit<T> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
sender.queue.unreserve();
sender.queue.unreserve(false);
}
}
}
Expand Down Expand Up @@ -1305,7 +1233,6 @@ mod tests {
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 0);
assert_eq!(tx.max_capacity(), 1);
assert_eq!(tx.queue.outstanding_permits(), 0, "reservation forgets permit");

rx.close();
core::future::poll_fn(|cx| {
Expand Down Expand Up @@ -1358,12 +1285,12 @@ mod tests {
#[test]
fn from_iter() {
future::block_on(async {
let chan = super::Channel::from_iter(3, [0, 1, 2, 3]);
let chan = super::Channel::from_iter(5, [0, 1, 2, 3]);
assert_eq!(chan.recv().await, Some(0));
assert_eq!(chan.recv().await, Some(1));
assert_eq!(chan.recv().await, Some(2));
assert_eq!(chan.recv().await, Some(3));
assert_eq!(chan.capacity(), 3);
assert_eq!(chan.capacity(), 5);
});
}

Expand Down Expand Up @@ -1406,64 +1333,4 @@ mod tests {
assert_eq!(tx.capacity(), 1);
});
}

#[test]
fn unbounded_send() {
future::block_on(async {
let mut chan = super::channel::<i32>(1);
let (tx, mut rx) = chan.split();

assert!(tx.send(-1).await.is_ok());
assert_eq!(tx.capacity(), 0);

let mut f1 = Box::pin(tx.send(-2));
let mut f2 = Box::pin(tx.send(-3));
let mut f3 = Box::pin(tx.reserve());

core::future::poll_fn(|cx| {
assert!(f1.as_mut().poll(cx).is_pending());
assert!(f2.as_mut().poll(cx).is_pending());
assert!(f3.as_mut().poll(cx).is_pending());
assert_eq!(tx.capacity(), 0); // logically -3

Poll::Ready(())
})
.await;

assert!(tx.unbounded_send(-99).is_ok());
assert!(tx.unbounded_send(-99).is_ok());
assert!(tx.unbounded_send(-99).is_ok());
assert_eq!(tx.capacity(), 0);

assert_eq!(rx.recv().await, Some(-1));
assert_eq!(tx.capacity(), 0, "capacity goes to f1");
assert_eq!(tx.queue.outstanding_permits(), 1);

assert_eq!(rx.recv().await, Some(-99));
assert_eq!(rx.recv().await, Some(-99));
assert_eq!(tx.capacity(), 0, "capacity goes to f1");
assert_eq!(tx.queue.outstanding_permits(), 1);

assert!(f1.await.is_ok());
assert_eq!(tx.capacity(), 0, "capacity goes to f2");

// capacity saturates at 1, is owned "in limbo" by f2
assert_eq!(rx.recv().await, Some(-99));
assert_eq!(rx.recv().await, Some(-2));

assert_eq!(tx.queue.outstanding_permits(), 1);
assert_eq!(tx.capacity(), 0, "capacity goes to f3");

drop(f2);
assert_eq!(tx.capacity(), 0, "capacity goes to f3");
assert_eq!(tx.queue.outstanding_permits(), 1);

f3.await.unwrap().send(-4);
assert_eq!(tx.capacity(), 0);

assert_eq!(rx.recv().await, Some(-4));
assert_eq!(tx.capacity(), 1);
assert_eq!(tx.queue.outstanding_permits(), 0);
});
}
}

0 comments on commit af1b996

Please sign in to comment.