Skip to content

Commit

Permalink
rtp_sender: Use a watch channel for send_called (#588)
Browse files Browse the repository at this point in the history
This removes the need for some mutexes
  • Loading branch information
haaspors committed Jun 29, 2024
1 parent 0e5c862 commit a74b96d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
33 changes: 18 additions & 15 deletions webrtc/src/rtp_transceiver/rtp_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ice::rand::generate_crypto_random_string;
use interceptor::stream_info::StreamInfo;
use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter};
use portable_atomic::AtomicBool;
use tokio::sync::{mpsc, Mutex, Notify};
use tokio::sync::{watch, Mutex, Notify};
use util::sync::Mutex as SyncMutex;

use super::srtp_writer_future::SequenceTransformer;
Expand All @@ -27,7 +27,6 @@ use crate::track::track_local::{
};

pub(crate) struct RTPSenderInternal {
pub(crate) send_called_rx: Mutex<mpsc::Receiver<()>>,
pub(crate) stop_called_rx: Arc<Notify>,
pub(crate) stop_called_signal: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -71,7 +70,7 @@ pub struct RTCRtpSender {

rtp_transceiver: SyncMutex<Option<Weak<RTCRtpTransceiver>>>,

send_called_tx: SyncMutex<Option<mpsc::Sender<()>>>,
send_called: watch::Sender<bool>,
stop_called_tx: Arc<Notify>,
stop_called_signal: Arc<AtomicBool>,

Expand Down Expand Up @@ -102,13 +101,12 @@ impl RTCRtpSender {
32,
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ",
);
let (send_called_tx, send_called_rx) = mpsc::channel(1);
let (send_called, _) = watch::channel(false);
let stop_called_tx = Arc::new(Notify::new());
let stop_called_rx = stop_called_tx.clone();
let stop_called_signal = Arc::new(AtomicBool::new(false));

let internal = Arc::new(RTPSenderInternal {
send_called_rx: Mutex::new(send_called_rx),
stop_called_rx,
stop_called_signal: Arc::clone(&stop_called_signal),
});
Expand Down Expand Up @@ -141,7 +139,7 @@ impl RTCRtpSender {

rtp_transceiver: SyncMutex::new(None),

send_called_tx: SyncMutex::new(Some(send_called_tx)),
send_called,
stop_called_tx,
stop_called_signal,

Expand Down Expand Up @@ -435,7 +433,7 @@ impl RTCRtpSender {
*write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer);
}

self.send_called_tx.lock().take();
self.send_called.send_replace(true);
Ok(())
}

Expand Down Expand Up @@ -469,10 +467,8 @@ impl RTCRtpSender {
&self,
b: &mut [u8],
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
let mut send_called_rx = self.internal.send_called_rx.lock().await;

tokio::select! {
_ = send_called_rx.recv() => {
_ = self.wait_for_send() => {
let rtcp_interceptor = {
let track_encodings = self.track_encodings.lock().await;
track_encodings.first().map(|e|e.rtcp_interceptor.clone())
Expand Down Expand Up @@ -503,10 +499,8 @@ impl RTCRtpSender {
b: &mut [u8],
rid: &str,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
let mut send_called_rx = self.internal.send_called_rx.lock().await;

tokio::select! {
_ = send_called_rx.recv() => {
_ = self.wait_for_send() => {
let rtcp_interceptor = {
let track_encodings = self.track_encodings.lock().await;
track_encodings.iter().find(|e| e.track.rid() == Some(rid)).map(|e| e.rtcp_interceptor.clone())
Expand Down Expand Up @@ -544,10 +538,19 @@ impl RTCRtpSender {
self.seq_trans.enable()
}

/// Will asynchronously block/wait until send() has been called
///
/// Note that it could return if underlying channel is closed,
/// however this shouldn't happen as we have a reference to self
/// which again owns the underlying channel.
pub async fn wait_for_send(&self) {
let mut watch = self.send_called.subscribe();
let _ = watch.wait_for(|r| *r).await;
}

/// has_sent tells if data has been ever sent for this instance
pub(crate) fn has_sent(&self) -> bool {
let send_called_tx = self.send_called_tx.lock();
send_called_tx.is_none()
*self.send_called.borrow()
}

/// has_stopped tells if stop has been called
Expand Down
3 changes: 3 additions & 0 deletions webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use bytes::Bytes;
use portable_atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Duration;
use waitgroup::WaitGroup;

Expand Down

0 comments on commit a74b96d

Please sign in to comment.