diff --git a/Cargo.lock b/Cargo.lock index 0d1520d7..346e33d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1738,6 +1738,7 @@ dependencies = [ "gstreamer-gl 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer-player 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer-sdp 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", + "gstreamer-sdp-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer-video 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer-webrtc 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/backends/dummy/src/lib.rs b/backends/dummy/src/lib.rs index f6051074..16b9c212 100644 --- a/backends/dummy/src/lib.rs +++ b/backends/dummy/src/lib.rs @@ -195,7 +195,7 @@ impl WebRtcControllerBackend for DummyWebRtcController { fn add_ice_candidate(&mut self, _: IceCandidate) {} fn create_offer(&mut self, _: SendBoxFnOnce<'static, (SessionDescription,)>) {} fn create_answer(&mut self, _: SendBoxFnOnce<'static, (SessionDescription,)>) {} - fn add_stream(&mut self, _: &mut MediaStream) {} + fn add_stream(&mut self, _: Box) {} fn internal_event(&mut self, _: thread::InternalEvent) {} fn quit(&mut self) {} } diff --git a/backends/gstreamer/Cargo.toml b/backends/gstreamer/Cargo.toml index db37ddb7..cbee9680 100644 --- a/backends/gstreamer/Cargo.toml +++ b/backends/gstreamer/Cargo.toml @@ -23,6 +23,9 @@ version = "0.8" [dependencies.gstreamer-sys] version = "0.7" +[dependencies.gstreamer-sdp-sys] +version = "0.7" + [dependencies.gstreamer] version = "0.13" features = ["subclassing"] diff --git a/backends/gstreamer/src/media_capture.rs b/backends/gstreamer/src/media_capture.rs index 17e871b3..25439ed1 100644 --- a/backends/gstreamer/src/media_capture.rs +++ b/backends/gstreamer/src/media_capture.rs @@ -128,14 +128,12 @@ impl GstMediaDevices { ("audio/x-raw", "Audio/Source") }; let caps = into_caps(constraints, format)?; - println!("requesting {:?}", caps); let f = self.monitor.add_filter(filter, &caps); let devices = self.monitor.get_devices(); if let Some(f) = f { let _ = self.monitor.remove_filter(f); } if let Some(d) = devices.get(0) { - println!("{:?}", d.get_caps()); let element = d.create_element(None)?; Some(GstMediaTrack { element }) } else { diff --git a/backends/gstreamer/src/media_stream.rs b/backends/gstreamer/src/media_stream.rs index bd58747a..18e9525a 100644 --- a/backends/gstreamer/src/media_stream.rs +++ b/backends/gstreamer/src/media_stream.rs @@ -11,7 +11,6 @@ lazy_static! { &[ ("media", &"audio"), ("encoding-name", &"OPUS"), - ("payload", &(97i32)), ], ) }; @@ -21,7 +20,6 @@ lazy_static! { &[ ("media", &"video"), ("encoding-name", &"VP8"), - ("payload", &(96i32)), ], ) }; @@ -50,20 +48,47 @@ impl MediaStream for GStreamerMediaStream { } impl GStreamerMediaStream { - pub fn attach_to_webrtc(&mut self, pipeline: &gst::Pipeline, webrtcbin: &gst::Element) { - println!("atttaching a {:?} stream", self.type_); - self.attach_to_pipeline(pipeline); - - let caps = match self.type_ { + pub fn caps(&self) -> &gst::Caps { + match self.type_ { StreamType::Audio => &*RTP_CAPS_OPUS, StreamType::Video => &*RTP_CAPS_VP8, - }; - self.elements - .last() - .as_ref() - .unwrap() - .link_filtered(webrtcbin, caps) - .unwrap(); + } + } + + pub fn caps_with_payload(&self, payload: i32) -> gst::Caps { + match self.type_ { + StreamType::Audio => { + gst::Caps::new_simple( + "application/x-rtp", + &[ + ("media", &"audio"), + ("encoding-name", &"OPUS"), + ("payload", &(payload)), + ], + ) + } + StreamType::Video => { + gst::Caps::new_simple( + "application/x-rtp", + &[ + ("media", &"video"), + ("encoding-name", &"VP8"), + ("payload", &(payload)), + ], + ) + } + } + } + + pub fn insert_capsfilter(&mut self) { + assert!(self.pipeline.is_none()); + let capsfilter = gst::ElementFactory::make("capsfilter", None).unwrap(); + capsfilter.set_property("caps", self.caps()).unwrap(); + self.elements.push(capsfilter); + } + + pub fn src_element(&self) -> gst::Element { + self.elements.last().unwrap().clone() } pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) { diff --git a/backends/gstreamer/src/webrtc.rs b/backends/gstreamer/src/webrtc.rs index 101063f8..a8a8a114 100644 --- a/backends/gstreamer/src/webrtc.rs +++ b/backends/gstreamer/src/webrtc.rs @@ -11,18 +11,57 @@ use servo_media_webrtc::WebRtcController as WebRtcThread; use servo_media_webrtc::*; use servo_media_streams::MediaStream; use std::sync::{Arc, Mutex}; +use std::{cmp, mem}; // TODO: // - add a proper error enum // - figure out purpose of glib loop +#[derive(Debug, Clone)] +pub struct MLineInfo { + /// The caps for the given m-line + caps: gst::Caps, + /// Whether or not this sink pad has already been connected + is_used: bool, + /// The payload value of the given m-line + payload: i32, +} + pub struct GStreamerWebRtcController { webrtc: Option, pipeline: gst::Pipeline, - has_streams: bool, + /// We can't trigger a negotiation-needed event until we have streams, or otherwise + /// a createOffer() call will lead to bad SDP. Instead, we delay negotiation. delayed_negotiation: bool, + /// A handle to the event loop abstraction surrounding the webrtc implementations, + /// which lets gstreamer callbacks send events back to the event loop to run on this object thread: WebRtcThread, signaller: Box, + /// All the streams that are actually connected to the webrtcbin (i.e., their presence has already + /// been negotiated) + streams: Vec>, + /// Disconnected streams that are waiting to be linked. Streams are + /// only linked when: + /// + /// - An offer is made (all pending streams are flushed) + /// - An offer is received (all matching pending streams are flushed) + /// - A stream is added when there is a so-far-disconnected remote-m-line + /// + /// In other words, these are all yet to be negotiated + /// + /// See link_stream + pending_streams: Vec>, + /// Each new webrtc stream should have a new payload/pt value, starting at 96 + /// + /// This is maintained as a known yet-unused payload number, being incremented whenever + /// we use it, and set to (remote_pt + 1) if the remote sends us a stream with a higher pt + pt_counter: i32, + /// We keep track of how many request pads have been created on webrtcbin + /// so that we can request more to fill in the gaps and acquire a specific pad if necessary + request_pad_counter: usize, + /// Streams need to be connected to the relevant sink pad, and we figure this out + /// by keeping track of the caps of each m-line in the SDP. + remote_mline_info: Vec, //send_msg_tx: mpsc::Sender, //peer_id: String, _main_loop: glib::MainLoop, @@ -42,14 +81,18 @@ impl WebRtcControllerBackend for GStreamerWebRtcController { } fn set_remote_description(&mut self, desc: SessionDescription, cb: SendBoxFnOnce<'static, ()>) { - self.set_description(desc, false, cb); + self.set_description(desc, DescriptionType::Remote, cb); } fn set_local_description(&mut self, desc: SessionDescription, cb: SendBoxFnOnce<'static, ()>) { - self.set_description(desc, true, cb); + self.set_description(desc, DescriptionType::Local, cb); } fn create_offer(&mut self, cb: SendBoxFnOnce<'static, (SessionDescription,)>) { + self.flush_pending_streams(true); + self.pipeline + .set_state(gst::State::Playing) + .unwrap(); let webrtc = self.webrtc.as_ref().unwrap(); let promise = gst::Promise::new_with_change_func(move |promise| { on_offer_or_answer_created(SdpType::Offer, promise, cb); @@ -71,17 +114,13 @@ impl WebRtcControllerBackend for GStreamerWebRtcController { .unwrap(); } - fn add_stream(&mut self, stream: &mut MediaStream) { - println!("adding a stream"); - self.has_streams = true; - let stream = stream + fn add_stream(&mut self, mut boxed_stream: Box) { + let stream = boxed_stream .as_mut_any() .downcast_mut::() .unwrap(); - stream.attach_to_webrtc(&self.pipeline, self.webrtc.as_ref().unwrap()); - self.pipeline - .set_state(gst::State::Playing) - .unwrap(); + stream.insert_capsfilter(); + self.link_stream(boxed_stream, false); if self.delayed_negotiation { self.delayed_negotiation = false; self.signaller.on_negotiation_needed(&self.thread); @@ -97,12 +136,14 @@ impl WebRtcControllerBackend for GStreamerWebRtcController { fn internal_event(&mut self, e: thread::InternalEvent) { match e { InternalEvent::OnNegotiationNeeded => { - if self.has_streams { - self.signaller.on_negotiation_needed(&self.thread); - } else { + if self.streams.is_empty() && self.pending_streams.is_empty() { + // we have no streams + // If the pipeline starts playing and on-negotiation-needed is present before there are any // media streams, an invalid SDP offer will be created. Therefore, delay emitting the signal self.delayed_negotiation = true; + } else { + self.signaller.on_negotiation_needed(&self.thread); } } InternalEvent::OnIceCandidate(candidate) => { @@ -114,6 +155,15 @@ impl WebRtcControllerBackend for GStreamerWebRtcController { .unwrap(); self.signaller.on_add_stream(stream); } + InternalEvent::DescriptionAdded(cb, description_type, ty) => { + if description_type == DescriptionType::Remote && ty == SdpType::Offer { + self.flush_pending_streams(false); + } + self.pipeline + .set_state(gst::State::Playing) + .unwrap(); + cb.call(); + } } } @@ -128,11 +178,12 @@ impl WebRtcControllerBackend for GStreamerWebRtcController { impl GStreamerWebRtcController { fn set_description( - &self, + &mut self, desc: SessionDescription, - local: bool, + description_type: DescriptionType, cb: SendBoxFnOnce<'static, ()>, ) { + let ty = match desc.type_ { SdpType::Answer => gst_webrtc::WebRTCSDPType::Answer, SdpType::Offer => gst_webrtc::WebRTCSDPType::Offer, @@ -140,20 +191,136 @@ impl GStreamerWebRtcController { SdpType::Rollback => gst_webrtc::WebRTCSDPType::Rollback, }; - let kind = if local { - "set-local-description" - } else { - "set-remote-description" + let kind = match description_type { + DescriptionType::Local => "set-local-description", + DescriptionType::Remote => "set-remote-description", }; - let ret = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap(); - let answer = gst_webrtc::WebRTCSessionDescription::new(ty, ret); - let promise = gst::Promise::new_with_change_func(move |_promise| cb.call()); + let sdp = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap(); + if description_type == DescriptionType::Remote { + self.store_remote_mline_info(&sdp); + } + let answer = gst_webrtc::WebRTCSessionDescription::new(ty, sdp); + let thread = self.thread.clone(); + let promise = gst::Promise::new_with_change_func(move |_promise| { + thread.internal_event(InternalEvent::DescriptionAdded(cb, description_type, desc.type_)); + }); self.webrtc .as_ref() .unwrap() .emit(kind, &[&answer, &promise]) .unwrap(); + + } + + fn store_remote_mline_info(&mut self, sdp: &gst_sdp::SDPMessage) { + // remove after https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/issues/189 is fixed + fn get_media(msg: &gst_sdp::SDPMessage, idx: u32) -> Option { + extern crate gstreamer_sdp_sys as gst_sdp_sys; + use glib::translate::*; + unsafe { + from_glib_none(gst_sdp_sys::gst_sdp_message_get_media(msg.to_glib_none().0, idx)) + } + } + self.remote_mline_info.clear(); + for i in 0..sdp.medias_len() { + let mut caps = gst::Caps::new_empty(); + let caps_mut = caps.get_mut().unwrap(); + let media = get_media(&sdp, i).unwrap(); + for format in 0..media.formats_len() { + let pt = media.get_format(format).unwrap().parse().unwrap(); + caps_mut.append(media.get_caps_from_media(pt).unwrap()); + self.pt_counter = cmp::max(self.pt_counter, pt + 1); + } + for cap in 0..caps_mut.get_size() { + // the caps are application/x-unknown by default, which will fail + // to intersect + // + // see https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/blob/ba62917fbfd98ea76d4e066a6f18b4a14b847362/ext/webrtc/gstwebrtcbin.c#L2521 + caps_mut.get_mut_structure(cap).unwrap().set_name("application/x-rtp") + } + self.remote_mline_info.push(MLineInfo { + caps: caps, + // XXXManishearth in the (yet unsupported) case of dynamic stream addition and renegotiation + // this will need to be checked against the current set of streams + is_used: false, + // XXXManishearth ideally, we keep track of all payloads and have the capability of picking + // the appropriate decoder. For this, a bunch of the streams code will have to be moved into + // a webrtc-specific abstraction. + payload: media.get_format(0).unwrap().parse().unwrap(), + }); + } + } + + /// Streams need to be linked to the correct pads, so we buffer them up until we know enough + /// to do this. + /// + /// When we get a remote offer, we store the relevant m-line information so that we can + /// pick the correct sink pad and payload. Shortly after we look for any pending streams + /// and connect them to available compatible m-lines using link_stream. + /// + /// When we create an offer, we're controlling the pad order, so we set request_new_pads + /// to true and forcefully link all pending streams before generating the offer. + /// + /// When request_new_pads is false, we may still request new pads, however we only do this for + /// streams that have already been negotiated by the remote. + fn link_stream(&mut self, mut boxed_stream: Box, request_new_pads: bool) { + let stream = boxed_stream + .as_mut_any() + .downcast_mut::() + .unwrap(); + let caps = stream.caps(); + let webrtc = self.webrtc.as_ref().unwrap(); + let idx = self.remote_mline_info.iter().enumerate() + .filter(|(_, x)| !x.is_used) + .find(|(_, x)| x.caps.can_intersect(&caps)) + .map(|x| x.0); + let element = stream.src_element(); + + if let Some(idx) = idx { + if idx >= self.request_pad_counter { + for i in self.request_pad_counter..=idx { + // webrtcbin needs you to request pads (or use element.link(webrtcbin)) + // however, it also wants them to be connected in the correct order. + // + // Here, we make sure all the numbered sink pads have been created beforehand, up to + // and including the one we need here. + // + // An alternate fix is to sort pending_streams according to the m-line index + // and just do it in order. This also seems brittle. + webrtc.get_request_pad(&format!("sink_{}", i)).unwrap(); + } + self.request_pad_counter = idx + 1; + } + stream.attach_to_pipeline(&self.pipeline); + self.remote_mline_info[idx].is_used = true; + let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload); + element.set_property("caps", &caps).unwrap(); + let src = element.get_static_pad("src").unwrap(); + let sink = webrtc.get_static_pad(&format!("sink_{}", idx)).unwrap(); + src.link(&sink).unwrap(); + self.streams.push(boxed_stream); + } else if request_new_pads { + stream.attach_to_pipeline(&self.pipeline); + let caps = stream.caps_with_payload(self.pt_counter); + self.pt_counter += 1; + element.set_property("caps", &caps).unwrap(); + let src = element.get_static_pad("src").unwrap(); + let sink = webrtc.get_request_pad(&format!("sink_{}", self.request_pad_counter)).unwrap(); + self.request_pad_counter += 1; + src.link(&sink).unwrap(); + self.streams.push(boxed_stream); + } else { + self.pending_streams.push(boxed_stream); + } + } + + /// link_stream, but for all pending streams + fn flush_pending_streams(&mut self, request_new_pads: bool) { + let pending_streams = mem::replace(&mut self.pending_streams, vec![]); + for stream in pending_streams { + self.link_stream(stream, request_new_pads) + } } } @@ -179,7 +346,6 @@ impl GStreamerWebRtcController { let thread = Arc::new(Mutex::new(self.thread.clone())); webrtc .connect("pad-added", false, move |values| { - println!("pad-added"); process_new_stream(values, &pipe_clone, thread.clone()); None }) @@ -221,7 +387,11 @@ pub fn construct( pipeline, signaller, thread, - has_streams: false, + remote_mline_info: vec![], + streams: vec![], + pending_streams: vec![], + pt_counter: 96, + request_pad_counter: 0, delayed_negotiation: false, _main_loop: main_loop, }; @@ -256,6 +426,7 @@ fn on_offer_or_answer_created( sdp: reply.get_sdp().as_text().unwrap(), type_, }; + cb.call(desc); } fn on_incoming_stream( @@ -270,17 +441,10 @@ fn on_incoming_stream( let decodebin2 = decodebin.clone(); decodebin .connect("pad-added", false, move |values| { - println!("decodebin pad-added"); on_incoming_decodebin_stream(values, &pipe_clone, thread.clone(), &name); None }) .unwrap(); - decodebin - .connect("no-more-pads", false, move |_| { - println!("no-more-pads"); - None - }) - .unwrap(); pipe.add(&decodebin).unwrap(); let decodepad = decodebin.get_static_pad("sink").unwrap(); @@ -294,7 +458,6 @@ fn on_incoming_decodebin_stream( thread: Arc>, name: &str, ) { - println!("incoming decodebin"); let pad = values[1].get::().expect("not a pad??"); let proxy_src = gst::ElementFactory::make("proxysrc", None).unwrap(); let proxy_sink = gst::ElementFactory::make("proxysink", None).unwrap(); diff --git a/examples/simple_webrtc.rs b/examples/simple_webrtc.rs index 44a73520..b92a939c 100644 --- a/examples/simple_webrtc.rs +++ b/examples/simple_webrtc.rs @@ -147,6 +147,7 @@ impl State { }; webrtc.add_stream(video); webrtc.add_stream(audio); + webrtc.configure(STUN_SERVER.into(), BundlePolicy::MaxBundle); } } diff --git a/webrtc/src/lib.rs b/webrtc/src/lib.rs index e3ddd11f..b3ec686a 100644 --- a/webrtc/src/lib.rs +++ b/webrtc/src/lib.rs @@ -19,7 +19,7 @@ pub trait WebRtcControllerBackend: Send { fn add_ice_candidate(&mut self, candidate: IceCandidate); fn create_offer(&mut self, cb: SendBoxFnOnce<'static, (SessionDescription,)>); fn create_answer(&mut self, cb: SendBoxFnOnce<'static, (SessionDescription,)>); - fn add_stream(&mut self, stream: &mut MediaStream); + fn add_stream(&mut self, stream: Box); fn internal_event(&mut self, event: thread::InternalEvent); fn quit(&mut self); } @@ -49,6 +49,12 @@ pub enum SdpType { Rollback, } +#[derive(Copy, Clone, Hash, Debug, PartialEq, Eq)] +pub enum DescriptionType { + Local, + Remote +} + impl SdpType { pub fn as_str(self) -> &'static str { match self { @@ -93,6 +99,7 @@ pub struct IceCandidate { } /// https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection#RTCBundlePolicy_enum +#[derive(Clone, Copy, Hash, Debug, PartialEq, Eq)] pub enum BundlePolicy { Balanced, MaxCompat, diff --git a/webrtc/src/thread.rs b/webrtc/src/thread.rs index 979392a9..42a84b47 100644 --- a/webrtc/src/thread.rs +++ b/webrtc/src/thread.rs @@ -3,7 +3,7 @@ use std::thread; use boxfnonce::SendBoxFnOnce; -use crate::{BundlePolicy, IceCandidate, MediaStream, SessionDescription}; +use crate::{BundlePolicy, DescriptionType, IceCandidate, MediaStream, SessionDescription, SdpType}; use crate::{WebRtcBackend, WebRtcControllerBackend, WebRtcSignaller}; #[derive(Clone)] @@ -90,6 +90,7 @@ pub enum InternalEvent { OnNegotiationNeeded, OnIceCandidate(IceCandidate), OnAddStream(Box), + DescriptionAdded(SendBoxFnOnce<'static, ()>, DescriptionType, SdpType), } pub fn handle_rtc_event(controller: &mut WebRtcControllerBackend, event: RtcThreadEvent) -> bool { @@ -102,7 +103,7 @@ pub fn handle_rtc_event(controller: &mut WebRtcControllerBackend, event: RtcThre RtcThreadEvent::AddIceCandidate(candidate) => controller.add_ice_candidate(candidate), RtcThreadEvent::CreateOffer(cb) => controller.create_offer(cb), RtcThreadEvent::CreateAnswer(cb) => controller.create_answer(cb), - RtcThreadEvent::AddStream(mut media) => controller.add_stream(&mut *media), + RtcThreadEvent::AddStream(media) => controller.add_stream(media), RtcThreadEvent::InternalEvent(e) => controller.internal_event(e), RtcThreadEvent::Quit => { controller.quit();