Skip to content

Commit

Permalink
Send side Simulcast/Track encodings (#578)
Browse files Browse the repository at this point in the history
* track_local: Add ability to set RTP stream ID

This change makes it possible to set the RTP stream ID to
allow forwarding and production of simulcast streams.

* peer_connection: Bolt on send side simulcast

Introduces add_encoding method in RTP sender to add simulcast encodings.

* rtp_sender: Add some missing tests

Ported over from pion

* peer_connection: Add Simulcast send test

This is a port from a pion test

* track_local: Insert mid and rid RTP header extension

When MID and RTP stream ID header extension are enabled all RTP packets
get corresponding MID and RID added as extension headers.

* peer_connection: Handle Simulcast Offer with one Media Section

This is ported from Pion
  • Loading branch information
haaspors committed Jun 20, 2024
1 parent 1ee5f79 commit 89285ce
Show file tree
Hide file tree
Showing 12 changed files with 911 additions and 295 deletions.
2 changes: 2 additions & 0 deletions webrtc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ impl API {
transport: Arc<RTCDtlsTransport>,
interceptor: Arc<dyn Interceptor + Send + Sync>,
) -> RTCRtpSender {
let kind = track.as_ref().map(|t| t.kind()).unwrap_or_default();
RTCRtpSender::new(
self.setting_engine.get_receive_mtu(),
track,
kind,
transport,
Arc::clone(&self.media_engine),
interceptor,
Expand Down
18 changes: 18 additions & 0 deletions webrtc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ pub enum Error {
#[error("new track must be of the same kind as previous")]
ErrRTPSenderNewTrackHasIncorrectKind,

/// ErrRTPSenderNewTrackHasIncorrectEnvelope indicates that the new track has a different envelope than the previous/original
#[error("new track must have the same envelope as previous")]
ErrRTPSenderNewTrackHasIncorrectEnvelope,

/// ErrRTPSenderDataSent indicates that the sequence number transformer tries to be enabled after the data sending began
#[error("Sequence number transformer must be enabled before sending data")]
ErrRTPSenderDataSent,
Expand Down Expand Up @@ -328,6 +332,20 @@ pub enum Error {
},
#[error("Track must not be nil")]
ErrRTPSenderTrackNil,
#[error("Sender has already been stopped")]
ErrRTPSenderStopped,
#[error("Sender Track has been removed or replaced to nil")]
ErrRTPSenderTrackRemoved,
#[error("Sender cannot add encoding as rid is empty")]
ErrRTPSenderRidNil,
#[error("Sender cannot add encoding as there is no base track")]
ErrRTPSenderNoBaseEncoding,
#[error("Sender cannot add encoding as provided track does not match base track")]
ErrRTPSenderBaseEncodingMismatch,
#[error("Sender cannot encoding due to RID collision")]
ErrRTPSenderRIDCollision,
#[error("Sender does not have track for RID")]
ErrRTPSenderNoTrackForRID,
#[error("RTPSender must not be nil")]
ErrRTPSenderNil,
#[error("RTPReceiver must not be nil")]
Expand Down
6 changes: 5 additions & 1 deletion webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,7 @@ impl RTCPeerConnection {
RTCRtpSender::new(
receive_mtu,
None,
kind,
Arc::clone(&self.internal.dtls_transport),
Arc::clone(&self.internal.media_engine),
Arc::clone(&self.interceptor),
Expand Down Expand Up @@ -1608,7 +1609,10 @@ impl RTCPeerConnection {
let current_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*current_transceivers {
let sender = transceiver.sender().await;
if sender.is_negotiated() && !sender.has_sent() {
if !sender.track_encodings.lock().await.is_empty()
&& sender.is_negotiated()
&& !sender.has_sent()
{
sender.send(&sender.get_parameters().await).await?;
}
}
Expand Down
139 changes: 81 additions & 58 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::stats::{
InboundRTPStats, OutboundRTPStats, RTCStatsType, RemoteInboundRTPStats, RemoteOutboundRTPStats,
StatsReportType,
};
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use crate::track::TrackStream;
use crate::SDP_ATTRIBUTE_RID;

Expand Down Expand Up @@ -442,44 +443,60 @@ impl PeerConnectionInternal {
.map(|value| value.direction)
.unwrap_or(RTCRtpTransceiverDirection::Sendrecv);

if direction == RTCRtpTransceiverDirection::Unspecified {
return Err(Error::ErrPeerConnAddTransceiverFromKindSupport);
}

let interceptor = self
.interceptor
.upgrade()
.ok_or(Error::ErrInterceptorNotBind)?;
let receiver = Arc::new(RTCRtpReceiver::new(
self.setting_engine.get_receive_mtu(),
kind,
Arc::clone(&self.dtls_transport),
Arc::clone(&self.media_engine),
Arc::clone(&interceptor),
));
let t = match direction {
RTCRtpTransceiverDirection::Sendonly | RTCRtpTransceiverDirection::Sendrecv => {
let codec = self
.media_engine
.get_codecs_by_kind(kind)
.first()
.map(|c| c.capability.clone())
.ok_or(Error::ErrNoCodecsAvailable)?;
let track = Arc::new(TrackLocalStaticSample::new(
codec,
math_rand_alpha(16),
math_rand_alpha(16),
));
self.new_transceiver_from_track(direction, track).await?
}
RTCRtpTransceiverDirection::Recvonly => {
let interceptor = self
.interceptor
.upgrade()
.ok_or(Error::ErrInterceptorNotBind)?;
let receiver = Arc::new(RTCRtpReceiver::new(
self.setting_engine.get_receive_mtu(),
kind,
Arc::clone(&self.dtls_transport),
Arc::clone(&self.media_engine),
Arc::clone(&interceptor),
));

let sender = Arc::new(
RTCRtpSender::new(
self.setting_engine.get_receive_mtu(),
None,
Arc::clone(&self.dtls_transport),
Arc::clone(&self.media_engine),
interceptor,
false,
)
.await,
);
let sender = Arc::new(
RTCRtpSender::new(
self.setting_engine.get_receive_mtu(),
None,
kind,
Arc::clone(&self.dtls_transport),
Arc::clone(&self.media_engine),
interceptor,
false,
)
.await,
);

let t = RTCRtpTransceiver::new(
receiver,
sender,
direction,
kind,
vec![],
Arc::clone(&self.media_engine),
Some(Box::new(self.make_negotiation_needed_trigger())),
)
.await;
RTCRtpTransceiver::new(
receiver,
sender,
direction,
kind,
vec![],
Arc::clone(&self.media_engine),
Some(Box::new(self.make_negotiation_needed_trigger())),
)
.await
}
_ => return Err(Error::ErrPeerConnAddTransceiverFromKindSupport),
};

self.add_rtp_transceiver(Arc::clone(&t)).await;

Expand Down Expand Up @@ -512,6 +529,7 @@ impl PeerConnectionInternal {
RTCRtpSender::new(
self.setting_engine.get_receive_mtu(),
Some(Arc::clone(&track)),
track.kind(),
Arc::clone(&self.dtls_transport),
Arc::clone(&self.media_engine),
Arc::clone(&interceptor),
Expand Down Expand Up @@ -858,6 +876,8 @@ impl PeerConnectionInternal {
let only_media_section = &remote_description.media_descriptions[0];
let mut stream_id = "";
let mut id = "";
let mut has_rid = false;
let mut has_ssrc = false;

for a in &only_media_section.attributes {
match a.key.as_str() {
Expand All @@ -870,12 +890,18 @@ impl PeerConnectionInternal {
}
}
}
ATTR_KEY_SSRC => return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC),
SDP_ATTRIBUTE_RID => return Ok(false),
ATTR_KEY_SSRC => has_ssrc = true,
SDP_ATTRIBUTE_RID => has_rid = true,
_ => {}
};
}

if has_rid {
return Ok(false);
} else if has_ssrc {
return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC);
}

let mut incoming = TrackDetails {
ssrcs: vec![ssrc],
kind: RTPCodecType::Video,
Expand Down Expand Up @@ -1343,32 +1369,29 @@ impl PeerConnectionInternal {
}
let mut track_infos = vec![];
for transceiver in transceivers {
let sender = transceiver.sender().await;

let mid = match transceiver.mid() {
Some(mid) => mid,
None => continue,
};

let track = match sender.track().await {
Some(track) => track,
None => continue,
};

let track_id = track.id().to_string();
let kind = match track.kind() {
RTPCodecType::Unspecified => continue,
RTPCodecType::Audio => "audio",
RTPCodecType::Video => "video",
};
let sender = transceiver.sender().await;
let track_encodings = sender.track_encodings.lock().await;
for encoding in track_encodings.iter() {
let track_id = encoding.track.id().to_string();
let kind = match encoding.track.kind() {
RTPCodecType::Unspecified => continue,
RTPCodecType::Audio => "audio",
RTPCodecType::Video => "video",
};

track_infos.push(TrackInfo {
track_id,
ssrc: sender.ssrc,
mid,
rid: None,
kind,
});
track_infos.push(TrackInfo {
track_id,
ssrc: encoding.ssrc,
mid: mid.to_owned(),
rid: encoding.track.rid().map(Into::into),
kind,
});
}
}

let stream_stats = self
Expand Down
125 changes: 125 additions & 0 deletions webrtc/src/peer_connection/peer_connection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::ice_transport::ice_server::RTCIceServer;
use crate::peer_connection::configuration::RTCConfiguration;
use crate::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use crate::stats::StatsReportType;
use crate::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use crate::Error;

Expand Down Expand Up @@ -513,3 +514,127 @@ async fn peer() -> Result<()> {

Ok(())
}

pub(crate) fn on_connected() -> (OnPeerConnectionStateChangeHdlrFn, mpsc::Receiver<()>) {
let (done_tx, done_rx) = mpsc::channel::<()>(1);
let done_tx = Arc::new(Mutex::new(Some(done_tx)));
let hdlr_fn: OnPeerConnectionStateChangeHdlrFn =
Box::new(move |state: RTCPeerConnectionState| {
let done_tx_clone = Arc::clone(&done_tx);
Box::pin(async move {
if state == RTCPeerConnectionState::Connected {
let mut tx = done_tx_clone.lock().await;
tx.take();
}
})
});
(hdlr_fn, done_rx)
}

// Everytime we receive a new SSRC we probe it and try to determine the proper way to handle it.
// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
// know the SSRC ahead of time
// * Undeclared SSRC in a single media section
// * Simulcast
//
// The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
// contained one Media Section we would never fire the OnTrack. We would assume it was a failed
// Undeclared SSRC processing. This test asserts that we properly handled this.
#[tokio::test]
async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> {
let mut m = MediaEngine::default();
for ext in [
::sdp::extmap::SDES_MID_URI,
::sdp::extmap::SDES_RTP_STREAM_ID_URI,
] {
m.register_header_extension(
RTCRtpHeaderExtensionCapability {
uri: ext.to_owned(),
},
RTPCodecType::Video,
None,
)?;
}
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();

let (mut pc_send, mut pc_recv) = new_pair(&api).await?;
let (send_notifier, mut send_connected) = on_connected();
let (recv_notifier, mut recv_connected) = on_connected();
pc_send.on_peer_connection_state_change(send_notifier);
pc_recv.on_peer_connection_state_change(recv_notifier);
let (track_tx, mut track_rx) = mpsc::unbounded_channel();
pc_recv.on_track(Box::new(move |t, _, _| {
let rid = t.rid().to_owned();
let _ = track_tx.send(rid);
Box::pin(async move {})
}));

let id = "video";
let stream_id = "webrtc-rs";
let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
id.to_owned(),
"a".to_owned(),
stream_id.to_owned(),
));
let track_a = Arc::clone(&track);
let transceiver = pc_send.add_transceiver_from_track(track, None).await?;
let sender = transceiver.sender().await;

let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
id.to_owned(),
"b".to_owned(),
stream_id.to_owned(),
));
let track_b = Arc::clone(&track);
sender.add_encoding(track).await?;

let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
id.to_owned(),
"c".to_owned(),
stream_id.to_owned(),
));
let track_c = Arc::clone(&track);
sender.add_encoding(track).await?;

// signaling
signal_pair(&mut pc_send, &mut pc_recv).await?;
let _ = send_connected.recv().await;
let _ = recv_connected.recv().await;

for sequence_number in [0; 100] {
let pkt = rtp::packet::Packet {
header: rtp::header::Header {
version: 2,
sequence_number,
payload_type: 96,
..Default::default()
},
payload: Bytes::from_static(&[0; 2]),
};

track_a.write_rtp_with_extensions(&pkt, &[]).await?;
track_b.write_rtp_with_extensions(&pkt, &[]).await?;
track_c.write_rtp_with_extensions(&pkt, &[]).await?;
}

assert_eq!(track_rx.recv().await.unwrap(), "a".to_owned());
assert_eq!(track_rx.recv().await.unwrap(), "b".to_owned());
assert_eq!(track_rx.recv().await.unwrap(), "c".to_owned());

close_pair_now(&pc_send, &pc_recv).await;

Ok(())
}
Loading

0 comments on commit 89285ce

Please sign in to comment.