From a74b96d866e3bf661eedda99d1fd7729c3d320f4 Mon Sep 17 00:00:00 2001 From: Haakon Sporsheim Date: Sat, 29 Jun 2024 18:02:58 +0200 Subject: [PATCH] rtp_sender: Use a watch channel for send_called (#588) This removes the need for some mutexes --- webrtc/src/rtp_transceiver/rtp_sender/mod.rs | 33 ++++++++++--------- .../rtp_sender/rtp_sender_test.rs | 3 ++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index 9ae813e06..8d86b6b5b 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -8,7 +8,7 @@ use ice::rand::generate_crypto_random_string; use interceptor::stream_info::StreamInfo; use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter}; use portable_atomic::AtomicBool; -use tokio::sync::{mpsc, Mutex, Notify}; +use tokio::sync::{watch, Mutex, Notify}; use util::sync::Mutex as SyncMutex; use super::srtp_writer_future::SequenceTransformer; @@ -27,7 +27,6 @@ use crate::track::track_local::{ }; pub(crate) struct RTPSenderInternal { - pub(crate) send_called_rx: Mutex>, pub(crate) stop_called_rx: Arc, pub(crate) stop_called_signal: Arc, } @@ -71,7 +70,7 @@ pub struct RTCRtpSender { rtp_transceiver: SyncMutex>>, - send_called_tx: SyncMutex>>, + send_called: watch::Sender, stop_called_tx: Arc, stop_called_signal: Arc, @@ -102,13 +101,12 @@ impl RTCRtpSender { 32, b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", ); - let (send_called_tx, send_called_rx) = mpsc::channel(1); + let (send_called, _) = watch::channel(false); let stop_called_tx = Arc::new(Notify::new()); let stop_called_rx = stop_called_tx.clone(); let stop_called_signal = Arc::new(AtomicBool::new(false)); let internal = Arc::new(RTPSenderInternal { - send_called_rx: Mutex::new(send_called_rx), stop_called_rx, stop_called_signal: Arc::clone(&stop_called_signal), }); @@ -141,7 +139,7 @@ impl RTCRtpSender { rtp_transceiver: SyncMutex::new(None), - send_called_tx: SyncMutex::new(Some(send_called_tx)), + send_called, stop_called_tx, stop_called_signal, @@ -435,7 +433,7 @@ impl RTCRtpSender { *write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer); } - self.send_called_tx.lock().take(); + self.send_called.send_replace(true); Ok(()) } @@ -469,10 +467,8 @@ impl RTCRtpSender { &self, b: &mut [u8], ) -> Result<(Vec>, Attributes)> { - let mut send_called_rx = self.internal.send_called_rx.lock().await; - tokio::select! { - _ = send_called_rx.recv() => { + _ = self.wait_for_send() => { let rtcp_interceptor = { let track_encodings = self.track_encodings.lock().await; track_encodings.first().map(|e|e.rtcp_interceptor.clone()) @@ -503,10 +499,8 @@ impl RTCRtpSender { b: &mut [u8], rid: &str, ) -> Result<(Vec>, Attributes)> { - let mut send_called_rx = self.internal.send_called_rx.lock().await; - tokio::select! { - _ = send_called_rx.recv() => { + _ = self.wait_for_send() => { let rtcp_interceptor = { let track_encodings = self.track_encodings.lock().await; track_encodings.iter().find(|e| e.track.rid() == Some(rid)).map(|e| e.rtcp_interceptor.clone()) @@ -544,10 +538,19 @@ impl RTCRtpSender { self.seq_trans.enable() } + /// Will asynchronously block/wait until send() has been called + /// + /// Note that it could return if underlying channel is closed, + /// however this shouldn't happen as we have a reference to self + /// which again owns the underlying channel. + pub async fn wait_for_send(&self) { + let mut watch = self.send_called.subscribe(); + let _ = watch.wait_for(|r| *r).await; + } + /// has_sent tells if data has been ever sent for this instance pub(crate) fn has_sent(&self) -> bool { - let send_called_tx = self.send_called_tx.lock(); - send_called_tx.is_none() + *self.send_called.borrow() } /// has_stopped tells if stop has been called diff --git a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs index f65cdd24d..cdae29c4f 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs @@ -1,5 +1,8 @@ use bytes::Bytes; use portable_atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use tokio::sync::mpsc; use tokio::time::Duration; use waitgroup::WaitGroup;