diff --git a/futures-util/benches/bilock.rs b/futures-util/benches/bilock.rs new file mode 100644 index 0000000000..52e269db97 --- /dev/null +++ b/futures-util/benches/bilock.rs @@ -0,0 +1,70 @@ +#![feature(test)] + +extern crate test; + +#[cfg(feature = "bilock")] +mod bench { + use futures::task::Poll; + use futures_test::task::noop_context; + use futures_util::lock::BiLock; + + use crate::test::Bencher; + + #[bench] + fn contended(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_lock(&mut context) { + Poll::Pending => (), + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }); + } + + #[bench] + fn lock_unlock(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }) + } +} diff --git a/futures-util/benches_disabled/bilock.rs b/futures-util/benches_disabled/bilock.rs deleted file mode 100644 index 417f75d31e..0000000000 --- a/futures-util/benches_disabled/bilock.rs +++ /dev/null @@ -1,122 +0,0 @@ -#![feature(test)] - -#[cfg(feature = "bilock")] -mod bench { - use futures::executor::LocalPool; - use futures::task::{Context, Waker}; - use futures_util::lock::BiLock; - use futures_util::lock::BiLockAcquire; - use futures_util::lock::BiLockAcquired; - use futures_util::task::ArcWake; - - use std::sync::Arc; - use test::Bencher; - - fn notify_noop() -> Waker { - struct Noop; - - impl ArcWake for Noop { - fn wake(_: &Arc) {} - } - - ArcWake::into_waker(Arc::new(Noop)) - } - - /// Pseudo-stream which simply calls `lock.poll()` on `poll` - struct LockStream { - lock: BiLockAcquire, - } - - impl LockStream { - fn new(lock: BiLock) -> Self { - Self { lock: lock.lock() } - } - - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired) { - self.lock = guard.unlock().lock() - } - } - - impl Stream for LockStream { - type Item = BiLockAcquired; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } - } - - #[bench] - fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); - } - - #[bench] - fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) - } -} diff --git a/futures/tests/bilock.rs b/futures/tests/bilock.rs new file mode 100644 index 0000000000..b80e4e12ee --- /dev/null +++ b/futures/tests/bilock.rs @@ -0,0 +1,106 @@ +#[cfg(feature = "bilock")] +mod tests { + use futures::executor::block_on; + use futures::future; + use futures::stream; + use futures::task::{Context, Poll}; + use futures::Future; + use futures::StreamExt; + use futures_test::task::noop_context; + use futures_util::lock::BiLock; + use std::pin::Pin; + use std::thread; + + #[test] + fn smoke() { + let future = future::lazy(|cx| { + + let (a, b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock(cx).is_pending()); + assert!(a.poll_lock(cx).is_pending()); + } + + assert!(b.poll_lock(cx).is_ready()); + assert!(a.poll_lock(cx).is_ready()); + + { + let lock = match b.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + assert_eq!(block_on(future), Ok(())); + } + + #[test] + fn concurrent() { + const N: usize = 10000; + let mut cx = noop_context(); + let (a, b) = BiLock::new(0); + + let a = Increment { a: Some(a), remaining: N }; + let b = stream::iter(0..N).fold(b, |b, _n| async { + let mut g = b.lock().await; + *g += 1; + drop(g); + b + }); + + let t1 = thread::spawn(move || block_on(a)); + let b = block_on(b); + let a = t1.join().unwrap(); + + match a.poll_lock(&mut cx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + match b.poll_lock(&mut cx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option>, + } + + impl Future for Increment { + type Output = BiLock; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.remaining == 0 { + return self.a.take().unwrap().into(); + } + + let a = self.a.as_mut().unwrap(); + let mut a = match a.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => return Poll::Pending, + }; + *a += 1; + drop(a); + self.remaining -= 1; + } + } + } + } +} diff --git a/futures/tests_disabled/bilock.rs b/futures/tests_disabled/bilock.rs deleted file mode 100644 index 0166ca48ba..0000000000 --- a/futures/tests_disabled/bilock.rs +++ /dev/null @@ -1,102 +0,0 @@ -use futures::future; -use futures::stream; -use futures::task; -use futures_util::lock::BiLock; -use std::thread; - -// mod support; -// use support::*; - -#[test] -fn smoke() { - let future = future::lazy(|_| { - let (a, b) = BiLock::new(1); - - { - let mut lock = match a.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => panic!("poll not ready"), - }; - assert_eq!(*lock, 1); - *lock = 2; - - assert!(b.poll_lock().is_pending()); - assert!(a.poll_lock().is_pending()); - } - - assert!(b.poll_lock().is_ready()); - assert!(a.poll_lock().is_ready()); - - { - let lock = match b.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => panic!("poll not ready"), - }; - assert_eq!(*lock, 2); - } - - assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); - - Ok::<(), ()>(()) - }); - - assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); -} - -#[test] -fn concurrent() { - const N: usize = 10000; - let (a, b) = BiLock::new(0); - - let a = Increment { a: Some(a), remaining: N }; - let b = stream::iter_ok(0..N).fold(b, |b, _n| { - b.lock().map(|mut b| { - *b += 1; - b.unlock() - }) - }); - - let t1 = thread::spawn(move || a.wait()); - let b = b.wait().expect("b error"); - let a = t1.join().unwrap().expect("a error"); - - match a.poll_lock() { - Poll::Ready(l) => assert_eq!(*l, 2 * N), - Poll::Pending => panic!("poll not ready"), - } - match b.poll_lock() { - Poll::Ready(l) => assert_eq!(*l, 2 * N), - Poll::Pending => panic!("poll not ready"), - } - - assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); - - struct Increment { - remaining: usize, - a: Option>, - } - - impl Future for Increment { - type Item = BiLock; - type Error = (); - - fn poll(&mut self) -> Poll, ()> { - loop { - if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()); - } - - let a = self.a.as_ref().unwrap(); - let mut a = match a.poll_lock() { - Poll::Ready(l) => l, - Poll::Pending => return Ok(Poll::Pending), - }; - self.remaining -= 1; - *a += 1; - } - } - } -}