Skip to content

Commit

Permalink
sync::broadcast: create Sender::new
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19 committed Jul 5, 2023
1 parent 910a1e2 commit 16cea8a
Showing 1 changed file with 77 additions and 36 deletions.
113 changes: 77 additions & 36 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,42 +444,9 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// This will panic if `capacity` is equal to `0` or larger
/// than `usize::MAX / 2`.
#[track_caller]
pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");

// Round to a power of two
capacity = capacity.next_power_of_two();

let mut buffer = Vec::with_capacity(capacity);

for i in 0..capacity {
buffer.push(RwLock::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
}));
}

let shared = Arc::new(Shared {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: 1,
closed: false,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
});

let rx = Receiver {
shared: shared.clone(),
next: 0,
};

let tx = Sender { shared };

pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let tx = Sender::new(capacity);
let rx = tx.subscribe();
(tx, rx)
}

Expand All @@ -490,6 +457,45 @@ unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}

impl<T> Sender<T> {
/// Creates the sending-half of the [`broadcast`] channel.
///
/// See documentation of [`broadcast::channel`] for errors when calling this function.
///
/// [`broadcast`]: crate::sync::broadcast
/// [`broadcast::channel`]: crate::sync::broadcast
#[track_caller]
pub fn new(mut capacity: usize) -> Self {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");

// Round to a power of two
capacity = capacity.next_power_of_two();

let mut buffer = Vec::with_capacity(capacity);

for i in 0..capacity {
buffer.push(RwLock::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
}));
}

let shared = Arc::new(Shared {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: 0,
closed: false,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
});

Sender { shared }
}

/// Attempts to send a value to all active [`Receiver`] handles, returning
/// it back if it could not be sent.
///
Expand Down Expand Up @@ -1369,3 +1375,38 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
}

fn is_unpin<T: Unpin>() {}

#[test]
fn receiver_count_on_sender_constructor() {
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;

let sender = Sender::<i32>::new(16);
assert_eq!(count_of(&sender), 0);

let rx_1 = sender.subscribe();
assert_eq!(count_of(&sender), 1);

let rx_2 = rx_1.resubscribe();
assert_eq!(count_of(&sender), 2);

let rx_3 = sender.subscribe();
assert_eq!(count_of(&sender), 3);

drop(rx_3);
drop(rx_1);
assert_eq!(count_of(&sender), 1);

drop(rx_2);
assert_eq!(count_of(&sender), 0);
}

#[test]
fn receiver_count_on_channel_constructor() {
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;

let (sender, rx) = channel::<i32>(16);
assert_eq!(count_of(&sender), 1);

let _rx_2 = rx.resubscribe();
assert_eq!(count_of(&sender), 2);
}

0 comments on commit 16cea8a

Please sign in to comment.