Skip to content

Commit

Permalink
remove uneeded subscriber count
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed May 13, 2024
1 parent aa1d253 commit a55828b
Showing 1 changed file with 2 additions and 9 deletions.
11 changes: 2 additions & 9 deletions crates/tokio-util/src/event_listeners.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::broadcast::{self, Sender};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{error, warn};
Expand All @@ -10,16 +9,11 @@ const DEFAULT_BROADCAST_CHANNEL_SIZE: usize = 1000;
pub struct EventListeners<T> {
/// The sender part of the broadcast channel
sender: Sender<T>,
/// The number of subscribers, needed because the broadcast sender doesn't track this
subscriber_count: AtomicUsize,
}

impl<T: Clone> Clone for EventListeners<T> {
fn clone(&self) -> Self {
EventListeners {
sender: self.sender.clone(),
subscriber_count: AtomicUsize::new(self.subscriber_count.load(Ordering::SeqCst)),
}
EventListeners { sender: self.sender.clone() }
}
}

Expand All @@ -33,7 +27,7 @@ impl<T: Clone + Send + Sync + 'static> EventListeners<T> {
/// Creates a new `EventListeners`.
pub fn new(broadcast_channel_size: usize) -> Self {
let (sender, _) = broadcast::channel(broadcast_channel_size);
Self { sender, subscriber_count: 0.into() }
Self { sender }
}

/// Broadcast sender setter.
Expand All @@ -60,7 +54,6 @@ impl<T: Clone + Send + Sync + 'static> EventListeners<T> {

/// Adds a new event listener and returns the associated receiver.
pub fn new_listener(&self) -> BroadcastStream<T> {
self.subscriber_count.fetch_add(1, Ordering::Relaxed);
BroadcastStream::new(self.sender.subscribe())
}
}

0 comments on commit a55828b

Please sign in to comment.