diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 4b36452cec3..da45a0fb45c 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -444,42 +444,14 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2; /// This will panic if `capacity` is equal to `0` or larger /// than `usize::MAX / 2`. #[track_caller] -pub fn channel(mut capacity: usize) -> (Sender, Receiver) { - assert!(capacity > 0, "capacity is empty"); - assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); - - // Round to a power of two - capacity = capacity.next_power_of_two(); - - let mut buffer = Vec::with_capacity(capacity); - - for i in 0..capacity { - buffer.push(RwLock::new(Slot { - rem: AtomicUsize::new(0), - pos: (i as u64).wrapping_sub(capacity as u64), - val: UnsafeCell::new(None), - })); - } - - let shared = Arc::new(Shared { - buffer: buffer.into_boxed_slice(), - mask: capacity - 1, - tail: Mutex::new(Tail { - pos: 0, - rx_cnt: 1, - closed: false, - waiters: LinkedList::new(), - }), - num_tx: AtomicUsize::new(1), - }); +pub fn channel(capacity: usize) -> (Sender, Receiver) { + let tx = Sender::new(capacity); let rx = Receiver { - shared: shared.clone(), + shared: tx.shared.clone(), next: 0, }; - let tx = Sender { shared }; - (tx, rx) } @@ -490,6 +462,44 @@ unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Sender { + /// Creates the sending-half of the [`broadcast`] channel. + /// + /// See documentation of [`broadcast::channel`] for errors when calling this function. + /// + /// [`broadcast`]: crate::sync::broadcast + /// [`broadcast::channel`]: crate::sync::broadcast + pub fn new(mut capacity: usize) -> Self { + assert!(capacity > 0, "capacity is empty"); + assert!(capacity <= usize::MAX >> 1, "requested capacity too large"); + + // Round to a power of two + capacity = capacity.next_power_of_two(); + + let mut buffer = Vec::with_capacity(capacity); + + for i in 0..capacity { + buffer.push(RwLock::new(Slot { + rem: AtomicUsize::new(0), + pos: (i as u64).wrapping_sub(capacity as u64), + val: UnsafeCell::new(None), + })); + } + + let shared = Arc::new(Shared { + buffer: buffer.into_boxed_slice(), + mask: capacity - 1, + tail: Mutex::new(Tail { + pos: 0, + rx_cnt: 1, + closed: false, + waiters: LinkedList::new(), + }), + num_tx: AtomicUsize::new(1), + }); + + Sender { shared } + } + /// Attempts to send a value to all active [`Receiver`] handles, returning /// it back if it could not be sent. ///