Skip to content

Commit

Permalink
sync: fix incorrect is_empty on mpsc block boundaries (#6603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 30, 2024
1 parent 873cb8a commit dbf93c7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
15 changes: 9 additions & 6 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,17 @@ impl<T> Block<T> {
Some(Read::Value(value.assume_init()))
}

/// Returns true if there is a value in the slot to be consumed
/// Returns true if *this* block has a value in the given slot.
///
/// # Safety
///
/// To maintain safety, the caller must ensure:
///
/// * No concurrent access to the slot.
/// Always returns false when given an index from a different block.
pub(crate) fn has_value(&self, slot_index: usize) -> bool {
if slot_index < self.header.start_index {
return false;
}
if slot_index >= self.header.start_index + super::BLOCK_CAP {
return false;
}

let offset = offset(slot_index);
let ready_bits = self.header.ready_slots.load(Acquire);
is_ready(ready_bits, offset)
Expand Down
12 changes: 12 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,4 +1421,16 @@ async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() {
assert_eq!(rx.len(), 1);
}

// Regression test for https://github.com/tokio-rs/tokio/issues/6602
#[tokio::test]
async fn test_is_empty_32_msgs() {
let (sender, mut receiver) = mpsc::channel(33);

for value in 1..257 {
sender.send(value).await.unwrap();
receiver.recv().await.unwrap();
assert!(receiver.is_empty(), "{value}. len: {}", receiver.len());
}
}

fn is_debug<T: fmt::Debug>(_: &T) {}

0 comments on commit dbf93c7

Please sign in to comment.