diff --git a/webrtc/src/api/mod.rs b/webrtc/src/api/mod.rs index 123c7d12f..2252dfbb2 100644 --- a/webrtc/src/api/mod.rs +++ b/webrtc/src/api/mod.rs @@ -157,9 +157,11 @@ impl API { transport: Arc, interceptor: Arc, ) -> RTCRtpSender { + let kind = track.as_ref().map(|t| t.kind()).unwrap_or_default(); RTCRtpSender::new( self.setting_engine.get_receive_mtu(), track, + kind, transport, Arc::clone(&self.media_engine), interceptor, diff --git a/webrtc/src/error.rs b/webrtc/src/error.rs index 0fcabbf38..9a9ff4403 100644 --- a/webrtc/src/error.rs +++ b/webrtc/src/error.rs @@ -199,6 +199,10 @@ pub enum Error { #[error("new track must be of the same kind as previous")] ErrRTPSenderNewTrackHasIncorrectKind, + /// ErrRTPSenderNewTrackHasIncorrectEnvelope indicates that the new track has a different envelope than the previous/original + #[error("new track must have the same envelope as previous")] + ErrRTPSenderNewTrackHasIncorrectEnvelope, + /// ErrRTPSenderDataSent indicates that the sequence number transformer tries to be enabled after the data sending began #[error("Sequence number transformer must be enabled before sending data")] ErrRTPSenderDataSent, @@ -328,6 +332,20 @@ pub enum Error { }, #[error("Track must not be nil")] ErrRTPSenderTrackNil, + #[error("Sender has already been stopped")] + ErrRTPSenderStopped, + #[error("Sender Track has been removed or replaced to nil")] + ErrRTPSenderTrackRemoved, + #[error("Sender cannot add encoding as rid is empty")] + ErrRTPSenderRidNil, + #[error("Sender cannot add encoding as there is no base track")] + ErrRTPSenderNoBaseEncoding, + #[error("Sender cannot add encoding as provided track does not match base track")] + ErrRTPSenderBaseEncodingMismatch, + #[error("Sender cannot encoding due to RID collision")] + ErrRTPSenderRIDCollision, + #[error("Sender does not have track for RID")] + ErrRTPSenderNoTrackForRID, #[error("RTPSender must not be nil")] ErrRTPSenderNil, #[error("RTPReceiver must not be nil")] diff --git a/webrtc/src/peer_connection/mod.rs b/webrtc/src/peer_connection/mod.rs index 0bba84c9d..e5cec0894 100644 --- a/webrtc/src/peer_connection/mod.rs +++ b/webrtc/src/peer_connection/mod.rs @@ -1410,6 +1410,7 @@ impl RTCPeerConnection { RTCRtpSender::new( receive_mtu, None, + kind, Arc::clone(&self.internal.dtls_transport), Arc::clone(&self.internal.media_engine), Arc::clone(&self.interceptor), @@ -1608,7 +1609,10 @@ impl RTCPeerConnection { let current_transceivers = self.internal.rtp_transceivers.lock().await; for transceiver in &*current_transceivers { let sender = transceiver.sender().await; - if sender.is_negotiated() && !sender.has_sent() { + if !sender.track_encodings.lock().await.is_empty() + && sender.is_negotiated() + && !sender.has_sent() + { sender.send(&sender.get_parameters().await).await?; } } diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 6cc2d68cb..50c2d8d89 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -14,6 +14,7 @@ use crate::stats::{ InboundRTPStats, OutboundRTPStats, RTCStatsType, RemoteInboundRTPStats, RemoteOutboundRTPStats, StatsReportType, }; +use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample; use crate::track::TrackStream; use crate::SDP_ATTRIBUTE_RID; @@ -442,44 +443,60 @@ impl PeerConnectionInternal { .map(|value| value.direction) .unwrap_or(RTCRtpTransceiverDirection::Sendrecv); - if direction == RTCRtpTransceiverDirection::Unspecified { - return Err(Error::ErrPeerConnAddTransceiverFromKindSupport); - } - - let interceptor = self - .interceptor - .upgrade() - .ok_or(Error::ErrInterceptorNotBind)?; - let receiver = Arc::new(RTCRtpReceiver::new( - self.setting_engine.get_receive_mtu(), - kind, - Arc::clone(&self.dtls_transport), - Arc::clone(&self.media_engine), - Arc::clone(&interceptor), - )); + let t = match direction { + RTCRtpTransceiverDirection::Sendonly | RTCRtpTransceiverDirection::Sendrecv => { + let codec = self + .media_engine + .get_codecs_by_kind(kind) + .first() + .map(|c| c.capability.clone()) + .ok_or(Error::ErrNoCodecsAvailable)?; + let track = Arc::new(TrackLocalStaticSample::new( + codec, + math_rand_alpha(16), + math_rand_alpha(16), + )); + self.new_transceiver_from_track(direction, track).await? + } + RTCRtpTransceiverDirection::Recvonly => { + let interceptor = self + .interceptor + .upgrade() + .ok_or(Error::ErrInterceptorNotBind)?; + let receiver = Arc::new(RTCRtpReceiver::new( + self.setting_engine.get_receive_mtu(), + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + Arc::clone(&interceptor), + )); - let sender = Arc::new( - RTCRtpSender::new( - self.setting_engine.get_receive_mtu(), - None, - Arc::clone(&self.dtls_transport), - Arc::clone(&self.media_engine), - interceptor, - false, - ) - .await, - ); + let sender = Arc::new( + RTCRtpSender::new( + self.setting_engine.get_receive_mtu(), + None, + kind, + Arc::clone(&self.dtls_transport), + Arc::clone(&self.media_engine), + interceptor, + false, + ) + .await, + ); - let t = RTCRtpTransceiver::new( - receiver, - sender, - direction, - kind, - vec![], - Arc::clone(&self.media_engine), - Some(Box::new(self.make_negotiation_needed_trigger())), - ) - .await; + RTCRtpTransceiver::new( + receiver, + sender, + direction, + kind, + vec![], + Arc::clone(&self.media_engine), + Some(Box::new(self.make_negotiation_needed_trigger())), + ) + .await + } + _ => return Err(Error::ErrPeerConnAddTransceiverFromKindSupport), + }; self.add_rtp_transceiver(Arc::clone(&t)).await; @@ -512,6 +529,7 @@ impl PeerConnectionInternal { RTCRtpSender::new( self.setting_engine.get_receive_mtu(), Some(Arc::clone(&track)), + track.kind(), Arc::clone(&self.dtls_transport), Arc::clone(&self.media_engine), Arc::clone(&interceptor), @@ -858,6 +876,8 @@ impl PeerConnectionInternal { let only_media_section = &remote_description.media_descriptions[0]; let mut stream_id = ""; let mut id = ""; + let mut has_rid = false; + let mut has_ssrc = false; for a in &only_media_section.attributes { match a.key.as_str() { @@ -870,12 +890,18 @@ impl PeerConnectionInternal { } } } - ATTR_KEY_SSRC => return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC), - SDP_ATTRIBUTE_RID => return Ok(false), + ATTR_KEY_SSRC => has_ssrc = true, + SDP_ATTRIBUTE_RID => has_rid = true, _ => {} }; } + if has_rid { + return Ok(false); + } else if has_ssrc { + return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC); + } + let mut incoming = TrackDetails { ssrcs: vec![ssrc], kind: RTPCodecType::Video, @@ -1343,32 +1369,29 @@ impl PeerConnectionInternal { } let mut track_infos = vec![]; for transceiver in transceivers { - let sender = transceiver.sender().await; - let mid = match transceiver.mid() { Some(mid) => mid, None => continue, }; - let track = match sender.track().await { - Some(track) => track, - None => continue, - }; - - let track_id = track.id().to_string(); - let kind = match track.kind() { - RTPCodecType::Unspecified => continue, - RTPCodecType::Audio => "audio", - RTPCodecType::Video => "video", - }; + let sender = transceiver.sender().await; + let track_encodings = sender.track_encodings.lock().await; + for encoding in track_encodings.iter() { + let track_id = encoding.track.id().to_string(); + let kind = match encoding.track.kind() { + RTPCodecType::Unspecified => continue, + RTPCodecType::Audio => "audio", + RTPCodecType::Video => "video", + }; - track_infos.push(TrackInfo { - track_id, - ssrc: sender.ssrc, - mid, - rid: None, - kind, - }); + track_infos.push(TrackInfo { + track_id, + ssrc: encoding.ssrc, + mid: mid.to_owned(), + rid: encoding.track.rid().map(Into::into), + kind, + }); + } } let stream_stats = self diff --git a/webrtc/src/peer_connection/peer_connection_test.rs b/webrtc/src/peer_connection/peer_connection_test.rs index 97a08c30c..2c9676037 100644 --- a/webrtc/src/peer_connection/peer_connection_test.rs +++ b/webrtc/src/peer_connection/peer_connection_test.rs @@ -19,6 +19,7 @@ use crate::ice_transport::ice_server::RTCIceServer; use crate::peer_connection::configuration::RTCConfiguration; use crate::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; use crate::stats::StatsReportType; +use crate::track::track_local::track_local_static_rtp::TrackLocalStaticRTP; use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample; use crate::Error; @@ -513,3 +514,127 @@ async fn peer() -> Result<()> { Ok(()) } + +pub(crate) fn on_connected() -> (OnPeerConnectionStateChangeHdlrFn, mpsc::Receiver<()>) { + let (done_tx, done_rx) = mpsc::channel::<()>(1); + let done_tx = Arc::new(Mutex::new(Some(done_tx))); + let hdlr_fn: OnPeerConnectionStateChangeHdlrFn = + Box::new(move |state: RTCPeerConnectionState| { + let done_tx_clone = Arc::clone(&done_tx); + Box::pin(async move { + if state == RTCPeerConnectionState::Connected { + let mut tx = done_tx_clone.lock().await; + tx.take(); + } + }) + }); + (hdlr_fn, done_rx) +} + +// Everytime we receive a new SSRC we probe it and try to determine the proper way to handle it. +// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't +// know the SSRC ahead of time +// * Undeclared SSRC in a single media section +// * Simulcast +// +// The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only +// contained one Media Section we would never fire the OnTrack. We would assume it was a failed +// Undeclared SSRC processing. This test asserts that we properly handled this. +#[tokio::test] +async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> { + let mut m = MediaEngine::default(); + for ext in [ + ::sdp::extmap::SDES_MID_URI, + ::sdp::extmap::SDES_RTP_STREAM_ID_URI, + ] { + m.register_header_extension( + RTCRtpHeaderExtensionCapability { + uri: ext.to_owned(), + }, + RTPCodecType::Video, + None, + )?; + } + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (mut pc_send, mut pc_recv) = new_pair(&api).await?; + let (send_notifier, mut send_connected) = on_connected(); + let (recv_notifier, mut recv_connected) = on_connected(); + pc_send.on_peer_connection_state_change(send_notifier); + pc_recv.on_peer_connection_state_change(recv_notifier); + let (track_tx, mut track_rx) = mpsc::unbounded_channel(); + pc_recv.on_track(Box::new(move |t, _, _| { + let rid = t.rid().to_owned(); + let _ = track_tx.send(rid); + Box::pin(async move {}) + })); + + let id = "video"; + let stream_id = "webrtc-rs"; + let track = Arc::new(TrackLocalStaticRTP::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + id.to_owned(), + "a".to_owned(), + stream_id.to_owned(), + )); + let track_a = Arc::clone(&track); + let transceiver = pc_send.add_transceiver_from_track(track, None).await?; + let sender = transceiver.sender().await; + + let track = Arc::new(TrackLocalStaticRTP::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + id.to_owned(), + "b".to_owned(), + stream_id.to_owned(), + )); + let track_b = Arc::clone(&track); + sender.add_encoding(track).await?; + + let track = Arc::new(TrackLocalStaticRTP::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + id.to_owned(), + "c".to_owned(), + stream_id.to_owned(), + )); + let track_c = Arc::clone(&track); + sender.add_encoding(track).await?; + + // signaling + signal_pair(&mut pc_send, &mut pc_recv).await?; + let _ = send_connected.recv().await; + let _ = recv_connected.recv().await; + + for sequence_number in [0; 100] { + let pkt = rtp::packet::Packet { + header: rtp::header::Header { + version: 2, + sequence_number, + payload_type: 96, + ..Default::default() + }, + payload: Bytes::from_static(&[0; 2]), + }; + + track_a.write_rtp_with_extensions(&pkt, &[]).await?; + track_b.write_rtp_with_extensions(&pkt, &[]).await?; + track_c.write_rtp_with_extensions(&pkt, &[]).await?; + } + + assert_eq!(track_rx.recv().await.unwrap(), "a".to_owned()); + assert_eq!(track_rx.recv().await.unwrap(), "b".to_owned()); + assert_eq!(track_rx.recv().await.unwrap(), "c".to_owned()); + + close_pair_now(&pc_send, &pc_recv).await; + + Ok(()) +} diff --git a/webrtc/src/peer_connection/sdp/mod.rs b/webrtc/src/peer_connection/sdp/mod.rs index 867135c46..3dff6d151 100644 --- a/webrtc/src/peer_connection/sdp/mod.rs +++ b/webrtc/src/peer_connection/sdp/mod.rs @@ -583,12 +583,32 @@ pub(crate) async fn add_transceiver_sdp( for mt in transceivers { let sender = mt.sender().await; if let Some(track) = sender.track().await { - media = media.with_media_source( - sender.ssrc, - track.stream_id().to_owned(), /* cname */ - track.stream_id().to_owned(), /* streamLabel */ - track.id().to_owned(), - ); + let send_parameters = sender.get_parameters().await; + for encoding in &send_parameters.encodings { + media = media.with_media_source( + encoding.ssrc, + track.stream_id().to_owned(), /* cname */ + track.stream_id().to_owned(), /* streamLabel */ + track.id().to_owned(), + ); + } + + if send_parameters.encodings.len() > 1 { + let mut send_rids = Vec::with_capacity(send_parameters.encodings.len()); + + for encoding in &send_parameters.encodings { + media = media.with_value_attribute( + SDP_ATTRIBUTE_RID.to_owned(), + format!("{} send", encoding.rid), + ); + send_rids.push(encoding.rid.to_string()); + } + + media = media.with_value_attribute( + SDP_ATTRIBUTE_SIMULCAST.to_owned(), + format!("send {}", send_rids.join(";")), + ); + } // Send msid based on the configured track if we haven't already // sent on this sender. If we have sent we must keep the msid line consistent, this diff --git a/webrtc/src/peer_connection/sdp/sdp_test.rs b/webrtc/src/peer_connection/sdp/sdp_test.rs index ea7ec90e6..ca8cf5b24 100644 --- a/webrtc/src/peer_connection/sdp/sdp_test.rs +++ b/webrtc/src/peer_connection/sdp/sdp_test.rs @@ -650,6 +650,7 @@ async fn test_media_description_fingerprints() -> Result<()> { RTCRtpSender::new( api.setting_engine.get_receive_mtu(), Some(track), + RTPCodecType::Video, Arc::new(RTCDtlsTransport::default()), Arc::clone(&api.media_engine), Arc::clone(&interceptor), diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index d0db5b33e..9ae813e06 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -15,7 +15,7 @@ use super::srtp_writer_future::SequenceTransformer; use crate::api::media_engine::MediaEngine; use crate::dtls_transport::RTCDtlsTransport; use crate::error::{Error, Result}; -use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTPCodecType}; +use crate::rtp_transceiver::rtp_codec::RTPCodecType; use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; use crate::rtp_transceiver::srtp_writer_future::SrtpWriterFuture; use crate::rtp_transceiver::{ @@ -30,69 +30,28 @@ pub(crate) struct RTPSenderInternal { pub(crate) send_called_rx: Mutex>, pub(crate) stop_called_rx: Arc, pub(crate) stop_called_signal: Arc, - pub(crate) rtcp_interceptor: Mutex>>, } -impl RTPSenderInternal { - /// read reads incoming RTCP for this RTPReceiver - async fn read( - &self, - b: &mut [u8], - ) -> Result<(Vec>, Attributes)> { - let mut send_called_rx = self.send_called_rx.lock().await; - - tokio::select! { - _ = send_called_rx.recv() =>{ - let rtcp_interceptor = { - let rtcp_interceptor = self.rtcp_interceptor.lock().await; - rtcp_interceptor.clone() - }; - if let Some(rtcp_interceptor) = rtcp_interceptor{ - let a = Attributes::new(); - tokio::select! { - _ = self.stop_called_rx.notified() => { - Err(Error::ErrClosedPipe) - } - result = rtcp_interceptor.read(b, &a) => { - Ok(result?) - } - } - }else{ - Err(Error::ErrInterceptorNotBind) - } - } - _ = self.stop_called_rx.notified() =>{ - Err(Error::ErrClosedPipe) - } - } - } - - /// read_rtcp is a convenience method that wraps Read and unmarshals for you. - async fn read_rtcp( - &self, - receive_mtu: usize, - ) -> Result<(Vec>, Attributes)> { - let mut b = vec![0u8; receive_mtu]; - let (pkts, attributes) = self.read(&mut b).await?; +pub(crate) struct TrackEncoding { + pub(crate) track: Arc, + pub(crate) srtp_stream: Arc, + pub(crate) rtcp_interceptor: Arc, + pub(crate) stream_info: Mutex, + pub(crate) context: Mutex, - Ok((pkts, attributes)) - } + pub(crate) ssrc: SSRC, } /// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer pub struct RTCRtpSender { - pub(crate) track: Mutex>>, + pub(crate) track_encodings: Mutex>, - pub(crate) srtp_stream: Arc, - pub(crate) stream_info: Mutex, seq_trans: Arc, - pub(crate) context: Mutex, - pub(crate) transport: Arc, + pub(crate) kind: RTPCodecType, pub(crate) payload_type: PayloadType, - pub(crate) ssrc: SSRC, receive_mtu: usize, /// a transceiver sender since we can just check the @@ -133,6 +92,7 @@ impl RTCRtpSender { pub async fn new( receive_mtu: usize, track: Option>, + kind: RTPCodecType, transport: Arc, media_engine: Arc, interceptor: Arc, @@ -145,50 +105,29 @@ impl RTCRtpSender { let (send_called_tx, send_called_rx) = mpsc::channel(1); let stop_called_tx = Arc::new(Notify::new()); let stop_called_rx = stop_called_tx.clone(); - let ssrc = rand::random::(); let stop_called_signal = Arc::new(AtomicBool::new(false)); let internal = Arc::new(RTPSenderInternal { send_called_rx: Mutex::new(send_called_rx), stop_called_rx, stop_called_signal: Arc::clone(&stop_called_signal), - rtcp_interceptor: Mutex::new(None), }); let seq_trans = Arc::new(SequenceTransformer::new()); - let srtp_stream = Arc::new(SrtpWriterFuture { - closed: AtomicBool::new(false), - ssrc, - rtp_sender: Arc::downgrade(&internal), - rtp_transport: Arc::clone(&transport), - rtcp_read_stream: Mutex::new(None), - rtp_write_session: Mutex::new(None), - seq_trans: Arc::clone(&seq_trans), - }); - - let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; - let rtcp_interceptor = interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; - { - let mut internal_rtcp_interceptor = internal.rtcp_interceptor.lock().await; - *internal_rtcp_interceptor = Some(rtcp_interceptor); - } let stream_ids = track .as_ref() .map(|track| vec![track.stream_id().to_string()]) .unwrap_or_default(); - Self { - track: Mutex::new(track), + let ret = Self { + track_encodings: Mutex::new(vec![]), - srtp_stream, - stream_info: Mutex::new(StreamInfo::default()), seq_trans, - context: Mutex::new(TrackLocalContext::default()), transport, + kind, payload_type: 0, - ssrc, receive_mtu, negotiated: AtomicBool::new(false), @@ -209,7 +148,86 @@ impl RTCRtpSender { paused: Arc::new(AtomicBool::new(start_paused)), internal, + }; + + if let Some(track) = track { + let mut track_encodings = ret.track_encodings.lock().await; + let _ = ret.add_encoding_internal(&mut track_encodings, track).await; + } + + ret + } + + /// AddEncoding adds an encoding to RTPSender. Used by simulcast senders. + pub async fn add_encoding(&self, track: Arc) -> Result<()> { + let mut track_encodings = self.track_encodings.lock().await; + + if track.rid().is_none() { + return Err(Error::ErrRTPSenderRidNil); + } + + if self.has_stopped().await { + return Err(Error::ErrRTPSenderStopped); + } + + if self.has_sent() { + return Err(Error::ErrRTPSenderSendAlreadyCalled); + } + + let base_track = track_encodings + .first() + .map(|e| &e.track) + .ok_or(Error::ErrRTPSenderNoBaseEncoding)?; + if base_track.rid().is_none() { + return Err(Error::ErrRTPSenderNoBaseEncoding); + } + + if base_track.id() != track.id() + || base_track.stream_id() != track.stream_id() + || base_track.kind() != track.kind() + { + return Err(Error::ErrRTPSenderBaseEncodingMismatch); + } + + if track_encodings.iter().any(|e| e.track.rid() == track.rid()) { + return Err(Error::ErrRTPSenderRIDCollision); } + + self.add_encoding_internal(&mut track_encodings, track) + .await + } + + async fn add_encoding_internal( + &self, + track_encodings: &mut Vec, + track: Arc, + ) -> Result<()> { + let ssrc = rand::random::(); + let srtp_stream = Arc::new(SrtpWriterFuture { + closed: AtomicBool::new(false), + ssrc, + rtp_sender: Arc::downgrade(&self.internal), + rtp_transport: Arc::clone(&self.transport), + rtcp_read_stream: Mutex::new(None), + rtp_write_session: Mutex::new(None), + seq_trans: Arc::clone(&self.seq_trans), + }); + + let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; + let rtcp_interceptor = self.interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; + + let encoding = TrackEncoding { + track, + srtp_stream, + rtcp_interceptor, + stream_info: Mutex::new(StreamInfo::default()), + context: Mutex::new(TrackLocalContext::default()), + ssrc, + }; + + track_encodings.push(encoding); + + Ok(()) } pub(crate) fn is_negotiated(&self) -> bool { @@ -241,49 +259,50 @@ impl RTCRtpSender { /// get_parameters describes the current configuration for the encoding and /// transmission of media on the sender's track. pub async fn get_parameters(&self) -> RTCRtpSendParameters { - let kind = { - let track = self.track.lock().await; - if let Some(t) = &*track { - t.kind() - } else { - RTPCodecType::default() - } - }; - - let mut send_parameters = { - RTCRtpSendParameters { - rtp_parameters: self - .media_engine - .get_rtp_parameters_by_kind(kind, RTCRtpTransceiverDirection::Sendonly), - encodings: vec![RTCRtpEncodingParameters { - ssrc: self.ssrc, + let encodings = { + let track_encodings = self.track_encodings.lock().await; + let mut encodings = Vec::with_capacity(track_encodings.len()); + for e in track_encodings.iter() { + encodings.push(RTCRtpEncodingParameters { + rid: e.track.rid().unwrap_or_default().into(), + ssrc: e.ssrc, payload_type: self.payload_type, ..Default::default() - }], + }); } + + encodings }; - let codecs = { - let tr = self.rtp_transceiver.lock().clone(); + let mut rtp_parameters = self + .media_engine + .get_rtp_parameters_by_kind(self.kind, RTCRtpTransceiverDirection::Sendonly); + rtp_parameters.codecs = { + let tr = self + .rtp_transceiver + .lock() + .clone() + .and_then(|t| t.upgrade()); if let Some(t) = &tr { - if let Some(t) = t.upgrade() { - t.get_codecs().await - } else { - self.media_engine.get_codecs_by_kind(kind) - } + t.get_codecs().await } else { - self.media_engine.get_codecs_by_kind(kind) + self.media_engine.get_codecs_by_kind(self.kind) } }; - send_parameters.rtp_parameters.codecs = codecs; - send_parameters + RTCRtpSendParameters { + rtp_parameters, + encodings, + } } /// track returns the RTCRtpTransceiver track, or nil pub async fn track(&self) -> Option> { - let track = self.track.lock().await; - track.clone() + self.track_encodings + .lock() + .await + .first() + .map(|e| Arc::clone(&e.track)) } /// replace_track replaces the track currently being used as the sender's source with a new TrackLocal. @@ -293,46 +312,36 @@ impl RTCRtpSender { &self, track: Option>, ) -> Result<()> { + let mut track_encodings = self.track_encodings.lock().await; + if let Some(t) = &track { - let tr = self.rtp_transceiver.lock(); - if let Some(r) = &*tr { - if let Some(r) = r.upgrade() { - if r.kind != t.kind() { - return Err(Error::ErrRTPSenderNewTrackHasIncorrectKind); - } - } else { - //TODO: what about None arc? - } - } else { - //TODO: what about None tr? + if self.kind != t.kind() { + return Err(Error::ErrRTPSenderNewTrackHasIncorrectKind); } - } - if self.has_sent() { - let t = { - let t = self.track.lock().await; - t.clone() - }; - if let Some(t) = t { - let context = self.context.lock().await; - t.unbind(&context).await?; + // cannot replace simulcast envelope + if track_encodings.len() > 1 { + return Err(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope); } - } - if !self.has_sent() || track.is_none() { - let mut t = self.track.lock().await; - *t = track; - return Ok(()); - } + let encoding = track_encodings + .first_mut() + .ok_or(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope)?; - let context = { - let context = self.context.lock().await; - context.clone() - }; + let mut context = encoding.context.lock().await; + if self.has_sent() { + encoding.track.unbind(&context).await?; + } - let result = if let Some(t) = &track { self.seq_trans.reset_offset(); + let mid = self + .rtp_transceiver + .lock() + .clone() + .and_then(|t| t.upgrade()) + .and_then(|t| t.mid()); + let new_context = TrackLocalContext { id: context.id.clone(), params: self @@ -341,37 +350,34 @@ impl RTCRtpSender { ssrc: context.ssrc, write_stream: context.write_stream.clone(), paused: self.paused.clone(), + mid, }; - t.bind(&new_context).await - } else { - Err(Error::ErrRTPSenderTrackNil) - }; + match t.bind(&new_context).await { + Err(err) => { + // Re-bind the original track + encoding.track.bind(&context).await?; - match result { - Err(err) => { - // Re-bind the original track - let track = self.track.lock().await; - if let Some(t) = &*track { - t.bind(&context).await?; + Err(err) } - - Err(err) - } - Ok(codec) => { - // Codec has changed - if self.payload_type != codec.payload_type { - let mut context = self.context.lock().await; + Ok(codec) => { + // Codec has changed context.params.codecs = vec![codec]; + encoding.track = Arc::clone(t); + Ok(()) } - - { - let mut t = self.track.lock().await; - *t = track; + } + } else { + if self.has_sent() { + for encoding in track_encodings.drain(..) { + let context = encoding.context.lock().await; + encoding.track.unbind(&context).await?; } - - Ok(()) + } else { + track_encodings.clear(); } + + Ok(()) } } @@ -380,70 +386,56 @@ impl RTCRtpSender { if self.has_sent() { return Err(Error::ErrRTPSenderSendAlreadyCalled); } + let track_encodings = self.track_encodings.lock().await; + if track_encodings.is_empty() { + return Err(Error::ErrRTPSenderTrackRemoved); + } + + let mid = self + .rtp_transceiver + .lock() + .clone() + .and_then(|t| t.upgrade()) + .and_then(|t| t.mid()); - let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); - let (context, stream_info) = { - let track = self.track.lock().await; + for (idx, encoding) in track_encodings.iter().enumerate() { + let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); let mut context = TrackLocalContext { id: self.id.clone(), params: self.media_engine.get_rtp_parameters_by_kind( - if let Some(t) = &*track { - t.kind() - } else { - RTPCodecType::default() - }, + encoding.track.kind(), RTCRtpTransceiverDirection::Sendonly, ), - ssrc: parameters.encodings[0].ssrc, + ssrc: parameters.encodings[idx].ssrc, write_stream: Some( Arc::clone(&write_stream) as Arc ), paused: self.paused.clone(), + mid: mid.to_owned(), }; - let codec = if let Some(t) = &*track { - t.bind(&context).await? - } else { - RTCRtpCodecParameters::default() - }; - let payload_type = codec.payload_type; - let capability = codec.capability.clone(); - context.params.codecs = vec![codec]; + let codec = encoding.track.bind(&context).await?; let stream_info = create_stream_info( self.id.clone(), - parameters.encodings[0].ssrc, - payload_type, - capability, + parameters.encodings[idx].ssrc, + codec.payload_type, + codec.capability.clone(), ¶meters.rtp_parameters.header_extensions, ); + context.params.codecs = vec![codec]; - (context, stream_info) - }; - - let srtp_rtp_writer = Arc::clone(&self.srtp_stream) as Arc; - let rtp_interceptor = self - .interceptor - .bind_local_stream(&stream_info, srtp_rtp_writer) - .await; - { - let mut interceptor_rtp_writer = write_stream.interceptor_rtp_writer.lock().await; - *interceptor_rtp_writer = Some(rtp_interceptor); - } - - { - let mut ctx = self.context.lock().await; - *ctx = context; - } - { - let mut si = self.stream_info.lock().await; - *si = stream_info; - } + let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc; + let rtp_writer = self + .interceptor + .bind_local_stream(&stream_info, srtp_writer) + .await; - { - let mut send_called_tx = self.send_called_tx.lock(); - send_called_tx.take(); + *encoding.context.lock().await = context; + *encoding.stream_info.lock().await = stream_info; + *write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer); } + self.send_called_tx.lock().take(); Ok(()) } @@ -461,12 +453,15 @@ impl RTCRtpSender { self.replace_track(None).await?; - { - let stream_info = self.stream_info.lock().await; + let track_encodings = self.track_encodings.lock().await; + for encoding in track_encodings.iter() { + let stream_info = encoding.stream_info.lock().await; self.interceptor.unbind_local_stream(&stream_info).await; + + encoding.srtp_stream.close().await?; } - self.srtp_stream.close().await + Ok(()) } /// read reads incoming RTCP for this RTPReceiver @@ -474,14 +469,67 @@ impl RTCRtpSender { &self, b: &mut [u8], ) -> Result<(Vec>, Attributes)> { - self.internal.read(b).await + let mut send_called_rx = self.internal.send_called_rx.lock().await; + + tokio::select! { + _ = send_called_rx.recv() => { + let rtcp_interceptor = { + let track_encodings = self.track_encodings.lock().await; + track_encodings.first().map(|e|e.rtcp_interceptor.clone()) + }.ok_or(Error::ErrInterceptorNotBind)?; + let a = Attributes::new(); + tokio::select! { + _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), + result = rtcp_interceptor.read(b, &a) => Ok(result?), + } + } + _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), + } } /// read_rtcp is a convenience method that wraps Read and unmarshals for you. pub async fn read_rtcp( &self, ) -> Result<(Vec>, Attributes)> { - self.internal.read_rtcp(self.receive_mtu).await + let mut b = vec![0u8; self.receive_mtu]; + let (pkts, attributes) = self.read(&mut b).await?; + + Ok((pkts, attributes)) + } + + /// ReadSimulcast reads incoming RTCP for this RTPSender for given rid + pub async fn read_simulcast( + &self, + b: &mut [u8], + rid: &str, + ) -> Result<(Vec>, Attributes)> { + let mut send_called_rx = self.internal.send_called_rx.lock().await; + + tokio::select! { + _ = send_called_rx.recv() => { + let rtcp_interceptor = { + let track_encodings = self.track_encodings.lock().await; + track_encodings.iter().find(|e| e.track.rid() == Some(rid)).map(|e| e.rtcp_interceptor.clone()) + }.ok_or(Error::ErrRTPSenderNoTrackForRID)?; + let a = Attributes::new(); + tokio::select! { + _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), + result = rtcp_interceptor.read(b, &a) => Ok(result?), + } + } + _ = self.internal.stop_called_rx.notified() => Err(Error::ErrClosedPipe), + } + } + + /// ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you + pub async fn read_rtcp_simulcast( + &self, + rid: &str, + ) -> Result<(Vec>, Attributes)> { + let mut b = vec![0u8; self.receive_mtu]; + let (pkts, attributes) = self.read_simulcast(&mut b, rid).await?; + + Ok((pkts, attributes)) } /// Enables overriding outgoing `RTP` packets' `sequence number`s. diff --git a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs index 0ebf0027f..f65cdd24d 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs @@ -14,6 +14,7 @@ use crate::peer_connection::peer_connection_test::{ until_connection_state, }; use crate::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; +use crate::rtp_transceiver::RTCRtpCodecParameters; use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample; #[tokio::test] @@ -135,10 +136,56 @@ async fn test_rtp_sender_get_parameters() -> Result<()> { signal_pair(&mut offerer, &mut answerer).await?; let sender = rtp_transceiver.sender().await; + assert!(sender.track().await.is_some()); let parameters = sender.get_parameters().await; assert_ne!(0, parameters.rtp_parameters.codecs.len()); assert_eq!(1, parameters.encodings.len()); - assert_eq!(sender.ssrc, parameters.encodings[0].ssrc); + assert_eq!( + sender.track_encodings.lock().await[0].ssrc, + parameters.encodings[0].ssrc + ); + assert!(parameters.encodings[0].rid.is_empty()); + + close_pair_now(&offerer, &answerer).await; + Ok(()) +} + +#[tokio::test] +async fn test_rtp_sender_get_parameters_with_rid() -> Result<()> { + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (mut offerer, mut answerer) = new_pair(&api).await?; + + let rtp_transceiver = offerer + .add_transceiver_from_kind(RTPCodecType::Video, None) + .await?; + + signal_pair(&mut offerer, &mut answerer).await?; + + let rid = "moo"; + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + rid.to_owned(), + "webrtc-rs".to_owned(), + )); + rtp_transceiver.set_sending_track(Some(track)).await?; + + let sender = rtp_transceiver.sender().await; + assert!(sender.track().await.is_some()); + let parameters = sender.get_parameters().await; + assert_ne!(0, parameters.rtp_parameters.codecs.len()); + assert_eq!(1, parameters.encodings.len()); + assert_eq!( + sender.track_encodings.lock().await[0].ssrc, + parameters.encodings[0].ssrc + ); + assert_eq!(rid, parameters.encodings[0].rid); close_pair_now(&offerer, &answerer).await; Ok(()) @@ -284,23 +331,19 @@ async fn test_rtp_sender_replace_track_invalid_codec_change() -> Result<()> { { let tr = rtp_sender.rtp_transceiver.lock(); - if let Some(t) = &*tr { - if let Some(t) = t.upgrade() { - t.set_codec_preferences(vec![RTCRtpCodecParameters { - capability: RTCRtpCodecCapability { - mime_type: MIME_TYPE_VP8.to_owned(), - ..Default::default() - }, - payload_type: 96, - ..Default::default() - }]) - .await?; - } else { - panic!(); - } - } else { - panic!(); - } + let t = tr + .as_ref() + .and_then(|t| t.upgrade()) + .expect("Weak transceiver valid"); + t.set_codec_preferences(vec![RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + payload_type: 96, + ..Default::default() + }]) + .await?; } signal_pair(&mut sender, &mut receiver).await?; @@ -333,3 +376,247 @@ async fn test_rtp_sender_replace_track_invalid_codec_change() -> Result<()> { close_pair_now(&sender, &receiver).await; Ok(()) } + +#[tokio::test] +async fn test_rtp_sender_get_parameters_replaced() -> Result<()> { + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (sender, receiver) = new_pair(&api).await?; + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = sender.add_track(track).await?; + let param = rtp_sender.get_parameters().await; + assert_eq!(1, param.encodings.len()); + + rtp_sender.replace_track(None).await?; + let param = rtp_sender.get_parameters().await; + assert_eq!(0, param.encodings.len()); + + close_pair_now(&sender, &receiver).await; + Ok(()) +} + +#[tokio::test] +async fn test_rtp_sender_send() -> Result<()> { + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (sender, receiver) = new_pair(&api).await?; + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = sender.add_track(track).await?; + let param = rtp_sender.get_parameters().await; + assert_eq!(1, param.encodings.len()); + + rtp_sender.send(¶m).await?; + + assert_eq!( + Error::ErrRTPSenderSendAlreadyCalled, + rtp_sender.send(¶m).await.unwrap_err() + ); + + close_pair_now(&sender, &receiver).await; + Ok(()) +} + +#[tokio::test] +async fn test_rtp_sender_send_track_removed() -> Result<()> { + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (sender, receiver) = new_pair(&api).await?; + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = sender.add_track(track).await?; + let param = rtp_sender.get_parameters().await; + assert_eq!(1, param.encodings.len()); + + sender.remove_track(&rtp_sender).await?; + assert_eq!( + Error::ErrRTPSenderTrackRemoved, + rtp_sender.send(¶m).await.unwrap_err() + ); + + close_pair_now(&sender, &receiver).await; + Ok(()) +} + +#[tokio::test] +async fn test_rtp_sender_add_encoding() -> Result<()> { + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + let api = APIBuilder::new().with_media_engine(m).build(); + + let (sender, receiver) = new_pair(&api).await?; + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = sender.add_track(track).await?; + + let track = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderRidNil, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "h".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderNoBaseEncoding, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "f".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = sender.add_track(track).await?; + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video-foobar".to_owned(), + "h".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderBaseEncodingMismatch, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "h".to_owned(), + "webrtc-rs-foobar".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderBaseEncodingMismatch, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_OPUS.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "h".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderBaseEncodingMismatch, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "f".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderRIDCollision, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "h".to_owned(), + "webrtc-rs".to_owned(), + )); + rtp_sender.add_encoding(track).await?; + + rtp_sender.send(&rtp_sender.get_parameters().await).await?; + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "f".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderSendAlreadyCalled, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + rtp_sender.stop().await?; + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "f".to_owned(), + "webrtc-rs".to_owned(), + )); + assert_eq!( + Error::ErrRTPSenderStopped, + rtp_sender.add_encoding(track).await.unwrap_err() + ); + + close_pair_now(&sender, &receiver).await; + Ok(()) +} diff --git a/webrtc/src/track/track_local/mod.rs b/webrtc/src/track/track_local/mod.rs index 5201bd130..fcfd8bda7 100644 --- a/webrtc/src/track/track_local/mod.rs +++ b/webrtc/src/track/track_local/mod.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use async_trait::async_trait; use interceptor::{Attributes, RTPWriter}; use portable_atomic::AtomicBool; +use smol_str::SmolStr; use tokio::sync::Mutex; use util::Unmarshal; @@ -38,6 +39,7 @@ pub struct TrackLocalContext { pub(crate) ssrc: SSRC, pub(crate) write_stream: Option>, pub(crate) paused: Arc, + pub(crate) mid: Option, } impl TrackLocalContext { @@ -89,6 +91,9 @@ pub trait TrackLocal { /// and stream_id would be 'desktop' or 'webcam' fn id(&self) -> &str; + /// RID is the RTP Stream ID for this track. + fn rid(&self) -> Option<&str>; + /// stream_id is the group this track belongs too. This must be unique fn stream_id(&self) -> &str; @@ -109,6 +114,7 @@ pub(crate) struct TrackBinding { params: RTCRtpParameters, write_stream: Option>, sender_paused: Arc, + hdr_ext_ids: Vec, } impl TrackBinding { diff --git a/webrtc/src/track/track_local/track_local_static_rtp.rs b/webrtc/src/track/track_local/track_local_static_rtp.rs index 5e951c282..18cdaed7e 100644 --- a/webrtc/src/track/track_local/track_local_static_rtp.rs +++ b/webrtc/src/track/track_local/track_local_static_rtp.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use tokio::sync::Mutex; use util::{Marshal, MarshalSize}; @@ -14,16 +14,34 @@ pub struct TrackLocalStaticRTP { pub(crate) bindings: Mutex>>, codec: RTCRtpCodecCapability, id: String, + rid: Option, stream_id: String, } impl TrackLocalStaticRTP { - /// returns a TrackLocalStaticRTP. + /// returns a TrackLocalStaticRTP without rid. pub fn new(codec: RTCRtpCodecCapability, id: String, stream_id: String) -> Self { TrackLocalStaticRTP { codec, bindings: Mutex::new(vec![]), id, + rid: None, + stream_id, + } + } + + /// returns a TrackLocalStaticRTP with rid. + pub fn new_with_rid( + codec: RTCRtpCodecCapability, + id: String, + rid: String, + stream_id: String, + ) -> Self { + TrackLocalStaticRTP { + codec, + bindings: Mutex::new(vec![]), + id, + rid: Some(rid), stream_id, } } @@ -99,6 +117,13 @@ impl TrackLocalStaticRTP { pkt.header.ssrc = b.ssrc; pkt.header.payload_type = b.payload_type; + for ext in b.hdr_ext_ids.iter() { + let payload = ext.payload.to_owned(); + if let Err(err) = pkt.header.set_extension(ext.id, payload) { + write_errs.push(Error::Rtp(err)); + } + } + for (uri, data) in extension_data.iter() { if let Some(id) = b .params @@ -143,18 +168,45 @@ impl TrackLocal for TrackLocalStaticRTP { capability: self.codec.clone(), ..Default::default() }; + let mut hdr_ext_ids = vec![]; + if let Some(id) = t + .header_extensions() + .iter() + .find(|e| e.uri == ::sdp::extmap::SDES_MID_URI) + .map(|e| e.id as u8) + { + if let Some(payload) = t + .mid + .as_ref() + .map(|mid| Bytes::copy_from_slice(mid.as_bytes())) + { + hdr_ext_ids.push(rtp::header::Extension { id, payload }); + } + } + + if let Some(id) = t + .header_extensions() + .iter() + .find(|e| e.uri == ::sdp::extmap::SDES_RTP_STREAM_ID_URI) + .map(|e| e.id as u8) + { + if let Some(payload) = self.rid().map(|rid| rid.to_owned().into()) { + hdr_ext_ids.push(rtp::header::Extension { id, payload }); + } + } let (codec, match_type) = codec_parameters_fuzzy_search(¶meters, t.codec_parameters()); if match_type != CodecMatch::None { { let mut bindings = self.bindings.lock().await; bindings.push(Arc::new(TrackBinding { + id: t.id(), ssrc: t.ssrc(), payload_type: codec.payload_type, - write_stream: t.write_stream(), params: t.params.clone(), - id: t.id(), + write_stream: t.write_stream(), sender_paused: t.paused.clone(), + hdr_ext_ids, })); } @@ -190,6 +242,11 @@ impl TrackLocal for TrackLocalStaticRTP { self.id.as_str() } + /// RID is the RTP Stream ID for this track. + fn rid(&self) -> Option<&str> { + self.rid.as_deref() + } + /// stream_id is the group this track belongs too. This must be unique fn stream_id(&self) -> &str { self.stream_id.as_str() diff --git a/webrtc/src/track/track_local/track_local_static_sample.rs b/webrtc/src/track/track_local/track_local_static_sample.rs index 1745ce469..2c4d5166e 100644 --- a/webrtc/src/track/track_local/track_local_static_sample.rs +++ b/webrtc/src/track/track_local/track_local_static_sample.rs @@ -24,7 +24,7 @@ pub struct TrackLocalStaticSample { } impl TrackLocalStaticSample { - /// returns a TrackLocalStaticSample + /// returns a TrackLocalStaticSample without RID pub fn new(codec: RTCRtpCodecCapability, id: String, stream_id: String) -> Self { let rtp_track = TrackLocalStaticRTP::new(codec, id, stream_id); @@ -39,6 +39,26 @@ impl TrackLocalStaticSample { } } + /// returns a TrackLocalStaticSample with RID + pub fn new_with_rid( + codec: RTCRtpCodecCapability, + id: String, + rid: String, + stream_id: String, + ) -> Self { + let rtp_track = TrackLocalStaticRTP::new_with_rid(codec, id, rid, stream_id); + + TrackLocalStaticSample { + rtp_track, + internal: Mutex::new(TrackLocalStaticSampleInternal { + packetizer: None, + sequencer: None, + clock_rate: 0.0f64, + did_warn_about_wonky_pause: false, + }), + } + } + /// codec gets the Codec of the track pub fn codec(&self) -> RTCRtpCodecCapability { self.rtp_track.codec() @@ -221,6 +241,11 @@ impl TrackLocal for TrackLocalStaticSample { self.rtp_track.id() } + /// RID is the RTP Stream ID for this track. + fn rid(&self) -> Option<&str> { + self.rtp_track.rid() + } + /// stream_id is the group this track belongs too. This must be unique fn stream_id(&self) -> &str { self.rtp_track.stream_id()