Skip to content

Commit

Permalink
improve semaphore tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Giersch committed Apr 22, 2023
1 parent 08e9eb9 commit 3f4f95b
Showing 1 changed file with 74 additions and 73 deletions.
147 changes: 74 additions & 73 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ impl Shared {
// the waiters request can be completed, assign all
// missing permits, wake the waiter, continue the loop
waiter.permits.set(waiter.wants);
self.outstanding_permits =
self.outstanding_permits.saturating_add(waiter.wants);
self.outstanding_permits = self.outstanding_permits.saturating_add(diff);
n -= diff;

// SAFETY: All wakers are initialized when the `Waiter`s
Expand Down Expand Up @@ -719,16 +718,20 @@ mod tests {
let sem = super::Semaphore::new(0);
let mut fut = core::pin::pin!(sem.acquire());

let permit = core::future::poll_fn(|cx| {
// poll future once to enqueue waiter
core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
assert_eq!(sem.waiters(), 1);
// add 2 permits, one goes directly to the enqueued waiter and
// wakes it, one goes into the semaphore
sem.add_permits(2);
assert_eq!(sem.outstanding_permits(), 1);
fut.as_mut().poll(cx)
Poll::Ready(())
})
.await
.unwrap();
.await;

// future must resolve now, since it has been woken
let permit = fut.await.unwrap();
assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.outstanding_permits(), 1);
drop(permit);
Expand Down Expand Up @@ -763,44 +766,50 @@ mod tests {
}

#[test]
fn acquire_two() {
fn acquire_many() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut1 = Box::pin(sem.acquire());
let mut fut2 = Box::pin(sem.acquire());
let mut f1 = Box::pin(sem.acquire_many(2));
let mut f2 = Box::pin(sem.acquire_many(1));

let permit = core::future::poll_fn(|cx| {
core::future::poll_fn(|cx| {
// poll both futures once to establish order
assert!(fut1.as_mut().poll(cx).is_pending());
assert!(fut2.as_mut().poll(cx).is_pending());
assert!(f1.as_mut().poll(cx).is_pending());
assert!(f2.as_mut().poll(cx).is_pending());

assert_eq!(sem.waiters(), 2);
sem.add_permits(1);
assert_eq!(sem.outstanding_permits(), 1);

// due to established order, fut2 must not resolve before fut1
assert!(fut2.as_mut().poll(cx).is_pending());
// fut1 should resolve and the permit dropped right away,
// allowing fut2 to resolve as well
let res = fut1.as_mut().poll(cx);
assert!(res.is_ready());
// due to established order, f2 must not resolve before f1
assert!(f2.as_mut().poll(cx).is_pending());

// adding another permit must wake f1
sem.add_permits(1);
assert_eq!(sem.outstanding_permits(), 2);
assert_eq!(sem.waiters(), 1);
assert_eq!(sem.outstanding_permits(), 1);
res
Poll::Ready(())
})
.await
.unwrap();
.await;

// f1 should resolve now
let permit = f1.await.unwrap();
assert_eq!(sem.waiters(), 1);
assert_eq!(sem.outstanding_permits(), 2);

// dropping the permit must pass one permit to the next waiter,
// wake it and return the other permit back to the semaphore
drop(permit);
assert_eq!(sem.waiters(), 0);
assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.outstanding_permits(), 1);

let permit = fut2.await.unwrap();
assert_eq!(sem.available_permits(), 0);
let permit = f2.await.unwrap();
assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.outstanding_permits(), 1);
drop(permit);

assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.available_permits(), 2);
assert_eq!(sem.outstanding_permits(), 0);
});
}
Expand All @@ -809,19 +818,37 @@ mod tests {
fn cleanup() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = Box::pin(sem.acquire());

let mut fut = Box::pin(sem.acquire());
// poll once to enqueue the future as waiting
core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;

// dropping the future should clear up its queue entry
// dropping the future should clear up its queue entry immediately
drop(fut);
assert_eq!(sem.waiters(), 0);
assert_eq!(sem.outstanding_permits(), 0);

let mut fut = Box::pin(sem.acquire());
// poll once to enqueue the future as waiting
core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;

// add 1 permit to wake future
sem.add_permits(1);
// ..and close semaphore
assert_eq!(sem.close(), 0);

assert!(fut.await.is_err());
assert_eq!(sem.waiters(), 0);
assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.outstanding_permits(), 0);
});
}

Expand All @@ -838,12 +865,12 @@ mod tests {
})
.await;

// adding a permit will wake the Acquire future instead of increasing the amount of
// available permits
// adding a permit will wake the Acquire future instead of
// increasing the amount of available permits
sem.add_permits(1);
assert_eq!(sem.outstanding_permits(), 1);
// dropping the future should return the added permit instead of removing the waker from
// the queue
// dropping the future should return the added permit instead of
// removing the waker from the queue
drop(fut);

assert_eq!(sem.waiters(), 0);
Expand All @@ -859,74 +886,48 @@ mod tests {
let permit = sem.acquire().await.unwrap();
assert_eq!(sem.outstanding_permits(), 1);

let mut fut = Box::pin(sem.acquire());
let mut f1 = Box::pin(sem.acquire());
let mut f2 = Box::pin(sem.acquire());
core::future::poll_fn(|cx| {
// poll once to enque the future as waiting
assert!(fut.as_mut().poll(cx).is_pending());
// poll once to enqueue the futures as waiting
assert!(f1.as_mut().poll(cx).is_pending());
assert!(f2.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;

assert_eq!(sem.waiters(), 1);
sem.close();
assert_eq!(sem.waiters(), 2);
assert_eq!(sem.close(), 2);
assert_eq!(sem.waiters(), 0);

core::future::poll_fn(|cx| {
// closing the semaphore should have woken the future
match fut.as_mut().poll(cx) {
match f1.as_mut().poll(cx) {
Poll::Ready(Err(_)) => Poll::Ready(()),
_ => panic!("acquire future should have resolved"),
}
})
.await;

// dropping the future should have no effect
drop(fut);
// dropping the resolved future should have no effect
drop(f1);
assert_eq!(sem.available_permits(), 0);
assert_eq!(sem.outstanding_permits(), 1);
// awaiting f2 must not deadlock, even if not polled manually
assert!(f2.await.is_err());

// dropping the permit must return even though the semaphore has
// been closed
drop(permit);
assert_eq!(sem.available_permits(), 1);
assert_eq!(sem.outstanding_permits(), 0);

// no further permits can be acquired
// no further permits must be acquirable
assert!(sem.try_acquire().is_err());
assert!(sem.acquire().await.is_err());
});
}

#[test]
fn closing_wakes() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = Box::pin(sem.build_acquire(1));

core::future::poll_fn(|cx| {
// poll once to enque the future as waiting
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;

assert_eq!(sem.waiters(), 1);
sem.close();
assert_eq!(sem.waiters(), 0);
assert_eq!(fut.waiter.waiting.get(), false);

core::future::poll_fn(|cx| {
// closing the semaphore should have woken the future
match fut.as_mut().poll(cx) {
Poll::Ready(Err(_)) => Poll::Ready(()),
_ => panic!("acquire future should have resolved"),
}
})
.await;

// dropping the future should have no effect
drop(fut);
assert_eq!(sem.available_permits(), 0);
});
}

#[test]
fn return_outstanding_permit_on_close() {
future::block_on(async {
Expand Down

0 comments on commit 3f4f95b

Please sign in to comment.