diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index 66b874975b8..a0e21066bd7 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -246,7 +246,7 @@ async fn weak_sender() { let tx_weak = tokio::spawn(async move { for i in 0..10 { - if let Err(_) = tx.send(i).await { + if tx.send(i).await.is_err() { return None; } } @@ -339,19 +339,14 @@ async fn actor_weak_sender() { async fn run(&mut self) { let mut i = 0; - loop { - match self.receiver.recv().await { - Some(msg) => { - self.handle_message(msg); - } - None => { - break; - } - } + while let Some(msg) = self.receiver.recv().await { + self.handle_message(msg); + if i == 0 { self.send_message_to_self().await; } - i += 1; + + i += 1 } assert!(self.received_self_msg); diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 1091eac430c..72f625f10ea 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,7 +1,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::{Arc, Weak}; +use crate::loom::sync::Arc; use crate::park::thread::CachedParkThread; use crate::park::Park; use crate::sync::mpsc::error::TryRecvError; @@ -11,9 +11,11 @@ use crate::sync::notify::Notify; use std::fmt; use std::mem; use std::process; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, SeqCst}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::Weak; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; +use std::usize; /// Channel sender. pub(crate) struct Tx { @@ -181,28 +183,31 @@ impl TxWeak { // even though the channel might have been closed in the meantime. // Need to check here whether the channel was actually closed. - let mut tx_count = inner.tx_count.load(Relaxed); + let mut tx_count = inner.tx_count.load(Acquire); + + if tx_count == 0 { + // channel is closed + mem::drop(inner); + return None; + } + loop { - // FIXME Haven't thought the orderings on the CAS through yet match inner .tx_count - .compare_exchange(tx_count, tx_count + 1, SeqCst, SeqCst) + .compare_exchange(tx_count, tx_count + 1, AcqRel, Acquire) { Ok(prev_count) => { - if prev_count == 0 { - mem::drop(inner); - return None; - } + assert!(prev_count != 0); return Some(Tx::new(inner)); } - Err(count) => { - if count == 0 { + Err(prev_count) => { + if prev_count == 0 { mem::drop(inner); return None; } - tx_count = count; + tx_count = prev_count; } } } @@ -441,9 +446,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { // ===== impl Semaphore for AtomicUsize ===== -use std::sync::atomic::Ordering::Release; -use std::usize; - impl Semaphore for AtomicUsize { fn add_permit(&self) { let prev = self.fetch_sub(2, Release);