Skip to content

Commit

Permalink
sync: add mpsc::Sender::closed future
Browse files Browse the repository at this point in the history
Fixes: tokio-rs#2800

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Sep 17, 2020
1 parent 4c4699b commit 054030b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -297,6 +297,41 @@ impl<T> Sender<T> {
}
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::channel::<()>(1);
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}

/// Attempts to immediately send a message on this `Sender`
///
/// This method differs from [`send`] by returning immediately if the channel's
Expand Down
46 changes: 45 additions & 1 deletion tokio/src/sync/mpsc/chan.rs
@@ -1,13 +1,15 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::{ClosedError, TryRecvError};
use crate::sync::mpsc::{error, list};
use crate::sync::Notify;

use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Relaxed, SeqCst};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

Expand Down Expand Up @@ -99,6 +101,12 @@ pub(crate) trait Semaphore {
}

struct Chan<T, S> {
/// Notifies all tasks listening for the receiver being dropped
notify_rx_closed: Notify,

/// Indicates whether the receiver has been dropped
rx_closed: AtomicBool,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

Expand Down Expand Up @@ -160,6 +168,8 @@ where
let (tx, rx) = list::channel();

let chan = Arc::new(Chan {
rx_closed: AtomicBool::new(false),
notify_rx_closed: Notify::new(),
tx,
semaphore,
rx_waker: AtomicWaker::new(),
Expand Down Expand Up @@ -199,6 +209,38 @@ where
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut self.permit)
}

pub(crate) async fn closed(&mut self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped. Requesting the notification
// requires polling the future once.
let notified = self.inner.notify_rx_closed.notified();
pin!(notified);

// Polling this for first time will register the waiter and
// return `Pending` or return `Ready` right away. If `Ready`
// is returned we are done
let aquired_lost_notification =
crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) {
Poll::Ready(()) => Poll::Ready(true),
Poll::Pending => Poll::Ready(false),
})
.await;

if aquired_lost_notification {
return;
}

if self.inner.rx_closed.load(SeqCst) {
return;
}
notified.await;
}
}

impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
Expand Down Expand Up @@ -270,6 +312,8 @@ where
});

self.inner.semaphore.close();
self.inner.rx_closed.store(true, SeqCst);
self.inner.notify_rx_closed.notify_waiters();
}

/// Receive the next value
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -177,4 +177,39 @@ impl<T> UnboundedSender<T> {
self.chan.send_unbounded(message)?;
Ok(())
}

/// Completes when the receiver has dropped.
///
/// This allows the producers to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
/// let mut tx2 = tx1.clone();
/// let mut tx3 = tx1.clone();
/// let mut tx4 = tx1.clone();
/// let mut tx5 = tx1.clone();
/// tokio::spawn(async move {
/// drop(rx);
/// });
///
/// futures::join!(
/// tx1.closed(),
/// tx2.closed(),
/// tx3.closed(),
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&mut self) {
self.chan.closed().await
}
}
28 changes: 28 additions & 0 deletions tokio/src/sync/tests/loom_mpsc.rs
Expand Up @@ -40,6 +40,34 @@ fn closing_unbounded_tx() {
});
}

#[test]
fn closing_bounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::channel::<()>(16);
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn closing_unbounded_rx() {
loom::model(|| {
let (mut tx1, rx) = mpsc::unbounded_channel::<()>();
let mut tx2 = tx1.clone();
thread::spawn(move || {
drop(rx);
});

block_on(tx1.closed());
block_on(tx2.closed());
});
}

#[test]
fn dropping_tx() {
loom::model(|| {
Expand Down

0 comments on commit 054030b

Please sign in to comment.