From 3b486b81d267c9f3f55d4f89e7abd000da46e311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20M=2E=20Bezerra?= Date: Mon, 26 Jun 2023 20:25:39 -0300 Subject: [PATCH] sync::broadcast: create Sender::new --- tokio/src/sync/broadcast.rs | 115 +++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 36 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 4b36452cec3..3f5500bc179 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -444,42 +444,9 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// This will panic if `capacity` is equal to `0` or larger /// than `usize::MAX / 2`. #[track_caller] -pub fn channel(mut capacity: usize) -> (Sender, Receiver) { - assert!(capacity > 0, "capacity is empty"); - assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); - - // Round to a power of two - capacity = capacity.next_power_of_two(); - - let mut buffer = Vec::with_capacity(capacity); - - for i in 0..capacity { - buffer.push(RwLock::new(Slot { - rem: AtomicUsize::new(0), - pos: (i as u64).wrapping_sub(capacity as u64), - val: UnsafeCell::new(None), - })); - } - - let shared = Arc::new(Shared { - buffer: buffer.into_boxed_slice(), - mask: capacity - 1, - tail: Mutex::new(Tail { - pos: 0, - rx_cnt: 1, - closed: false, - waiters: LinkedList::new(), - }), - num_tx: AtomicUsize::new(1), - }); - - let rx = Receiver { - shared: shared.clone(), - next: 0, - }; - - let tx = Sender { shared }; - +pub fn channel(capacity: usize) -> (Sender, Receiver) { + let tx = Sender::new(capacity); + let rx = tx.subscribe(); (tx, rx) } @@ -490,6 +457,45 @@ unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Sender { + /// Creates the sending-half of the [`broadcast`] channel. + /// + /// See documentation of [`broadcast::channel`] for errors when calling this function. + /// + /// [`broadcast`]: crate::sync::broadcast + /// [`broadcast::channel`]: crate::sync::broadcast + #[track_caller] + pub fn new(mut capacity: usize) -> Self { + assert!(capacity > 0, "capacity is empty"); + assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); + + // Round to a power of two + capacity = capacity.next_power_of_two(); + + let mut buffer = Vec::with_capacity(capacity); + + for i in 0..capacity { + buffer.push(RwLock::new(Slot { + rem: AtomicUsize::new(0), + pos: (i as u64).wrapping_sub(capacity as u64), + val: UnsafeCell::new(None), + })); + } + + let shared = Arc::new(Shared { + buffer: buffer.into_boxed_slice(), + mask: capacity - 1, + tail: Mutex::new(Tail { + pos: 0, + rx_cnt: 0, + closed: false, + waiters: LinkedList::new(), + }), + num_tx: AtomicUsize::new(1), + }); + + Sender { shared } + } + /// Attempts to send a value to all active [`Receiver`] handles, returning /// it back if it could not be sent. /// @@ -1369,3 +1375,40 @@ impl<'a, T> Drop for RecvGuard<'a, T> { } fn is_unpin() {} + +#[cfg(not(loom))] +#[test] +fn receiver_count_on_sender_constructor() { + let count_of = |sender: &Sender| sender.shared.tail.lock().rx_cnt; + + let sender = Sender::::new(16); + assert_eq!(count_of(&sender), 0); + + let rx_1 = sender.subscribe(); + assert_eq!(count_of(&sender), 1); + + let rx_2 = rx_1.resubscribe(); + assert_eq!(count_of(&sender), 2); + + let rx_3 = sender.subscribe(); + assert_eq!(count_of(&sender), 3); + + drop(rx_3); + drop(rx_1); + assert_eq!(count_of(&sender), 1); + + drop(rx_2); + assert_eq!(count_of(&sender), 0); +} + +#[cfg(not(loom))] +#[test] +fn receiver_count_on_channel_constructor() { + let count_of = |sender: &Sender| sender.shared.tail.lock().rx_cnt; + + let (sender, rx) = channel::(16); + assert_eq!(count_of(&sender), 1); + + let _rx_2 = rx.resubscribe(); + assert_eq!(count_of(&sender), 2); +}