Skip to content

Commit

Permalink
Add WeakShared.
Browse files Browse the repository at this point in the history
This makes implementing a cache with futures much easier. With the current architecture a cache that uses Shareds cannot honor the "drop as cancellation" feature of futures. And the cache is forced to either poll futures itself to ensure they're polled to completion or to leave half-polled futures dangling inside it (potentially consuming resources like sockets or database connections). WeakShared is to Shared as Weak is to Arc. If any copy of the underlying Shared exists that has not be dropped or polled to completion the WeakShared can be upgraded to it. This makes it possible to construct a cache that does not entrain futures, thus honoring the "drop as cancellation" feature.
  • Loading branch information
khuey authored and taiki-e committed Oct 26, 2020
1 parent c4f7349 commit 8bb7fd9
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 4 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/future/future/mod.rs
Expand Up @@ -114,7 +114,7 @@ pub use self::remote_handle::{Remote, RemoteHandle};
mod shared;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::shared::Shared;
pub use self::shared::{Shared, WeakShared};

impl<T: ?Sized> FutureExt for T where T: Future {}

Expand Down
35 changes: 34 additions & 1 deletion futures-util/src/future/future/shared.rs
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};

/// Future for the [`shared`](super::FutureExt::shared) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand All @@ -26,6 +26,9 @@ struct Notifier {
wakers: Mutex<Option<Slab<Option<Waker>>>>,
}

/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`.
pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>);

// The future itself is polled behind the `Arc`, so it won't be moved
// when `Shared` is moved.
impl<Fut: Future> Unpin for Shared<Fut> {}
Expand All @@ -45,6 +48,12 @@ impl<Fut: Future> fmt::Debug for Inner<Fut> {
}
}

impl<Fut: Future> fmt::Debug for WeakShared<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakShared").finish()
}
}

enum FutureOrOutput<Fut: Future> {
Future(Fut),
Output(Fut::Output),
Expand Down Expand Up @@ -107,6 +116,16 @@ where
}
None
}

/// Creates a new [`WeakShared`] for this [`Shared`].
///
/// Returns [`None`] if it has already been polled to completion.
pub fn downgrade(&self) -> Option<WeakShared<Fut>> {
if let Some(inner) = self.inner.as_ref() {
return Some(WeakShared(Arc::downgrade(inner)));
}
None
}
}

impl<Fut> Inner<Fut>
Expand Down Expand Up @@ -314,3 +333,17 @@ impl ArcWake for Notifier {
}
}
}

impl<Fut: Future> WeakShared<Fut>
{
/// Attempts to upgrade this [`WeakShared`] into a [`Shared`].
///
/// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled
/// to completion.
pub fn upgrade(&self) -> Option<Shared<Fut>> {
Some(Shared {
inner: Some(self.0.upgrade()?),
waker_key: NULL_WAKER_KEY,
})
}
}
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Expand Up @@ -28,7 +28,7 @@ pub use self::future::CatchUnwind;
pub use self::future::{Remote, RemoteHandle};

#[cfg(feature = "std")]
pub use self::future::Shared;
pub use self::future::{Shared, WeakShared};

mod try_future;
pub use self::try_future::{
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -301,7 +301,7 @@ pub mod future {
#[cfg(feature = "std")]
pub use futures_util::future::{
Remote, RemoteHandle,
CatchUnwind, Shared,
CatchUnwind, Shared, WeakShared,
};
}

Expand Down
28 changes: 28 additions & 0 deletions futures/tests/shared.rs
Expand Up @@ -143,6 +143,34 @@ fn peek() {
}
}

#[test]
fn downgrade() {
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::FutureExt;

let (tx, rx) = oneshot::channel::<i32>();
let shared = rx.shared();
// Since there are outstanding `Shared`s, we can get a `WeakShared`.
let weak = shared.downgrade().unwrap();
// It should upgrade fine right now.
let mut shared2 = weak.upgrade().unwrap();

tx.send(42).unwrap();
assert_eq!(block_on(shared).unwrap(), 42);

// We should still be able to get a new `WeakShared` and upgrade it
// because `shared2` is outstanding.
assert!(shared2.downgrade().is_some());
assert!(weak.upgrade().is_some());

assert_eq!(block_on(&mut shared2).unwrap(), 42);
// Now that all `Shared`s have been exhausted, we should not be able
// to get a new `WeakShared` or upgrade an existing one.
assert!(weak.upgrade().is_none());
assert!(shared2.downgrade().is_none());
}

#[test]
fn dont_clone_in_single_owner_shared_future() {
use futures::channel::oneshot;
Expand Down

0 comments on commit 8bb7fd9

Please sign in to comment.