Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow simultaneous playback of audio and video streams #260

Merged
merged 5 commits into from May 28, 2019
@@ -1,5 +1,8 @@
#[cfg(any(
all(target_os = "android", any(target_arch = "arm", target_arch = "aarch64")),
all(
target_os = "android",
any(target_arch = "arm", target_arch = "aarch64")
),
target_arch = "x86_64"
))]
mod platform {
@@ -8,7 +11,10 @@ mod platform {
}

#[cfg(not(any(
all(target_os = "android", any(target_arch = "arm", target_arch = "aarch64")),
all(
target_os = "android",
any(target_arch = "arm", target_arch = "aarch64")
),
target_arch = "x86_64"
)))]
mod platform {
@@ -144,7 +144,7 @@ impl Player for DummyPlayer {
Ok(())
}

fn set_stream(&self, _: &MediaStreamId) -> Result<(), PlayerError> {
fn set_stream(&self, _: &MediaStreamId, _: bool) -> Result<(), PlayerError> {
Ok(())
}

@@ -1,9 +1,9 @@
use crate::media_stream::{GStreamerMediaStream};
use crate::media_stream::GStreamerMediaStream;
use gst;
use gst::prelude::*;
use servo_media_streams::MediaStreamType;
use servo_media_streams::capture::*;
use servo_media_streams::registry::{register_stream, MediaStreamId};
use servo_media_streams::MediaStreamType;
use std::i32;
use std::sync::{Arc, Mutex};

@@ -10,13 +10,13 @@ use std::any::Any;
use std::sync::{Arc, Mutex};

lazy_static! {
static ref RTP_CAPS_OPUS: gst::Caps = {
pub static ref RTP_CAPS_OPUS: gst::Caps = {
gst::Caps::new_simple(
"application/x-rtp",
&[("media", &"audio"), ("encoding-name", &"OPUS")],
)
};
static ref RTP_CAPS_VP8: gst::Caps = {
pub static ref RTP_CAPS_VP8: gst::Caps = {
gst::Caps::new_simple(
"application/x-rtp",
&[("media", &"video"), ("encoding-name", &"VP8")],
@@ -5,21 +5,60 @@ use glib::translate::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use media_stream::GStreamerMediaStream;
use gst_base::UniqueFlowCombiner;
use media_stream::{GStreamerMediaStream, RTP_CAPS_OPUS, RTP_CAPS_VP8};
use servo_media_streams::{MediaStream, MediaStreamType};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use url::Url;

// Implementation sub-module of the GObject
mod imp {
use super::*;

lazy_static! {
static ref AUDIO_SRC_PAD_TEMPLATE: gst::PadTemplate = {
gst::PadTemplate::new(
"audio_src",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&RTP_CAPS_OPUS,
).expect("Could not create audio src pad template")
};
static ref VIDEO_SRC_PAD_TEMPLATE: gst::PadTemplate = {
gst::PadTemplate::new(
"video_src",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&RTP_CAPS_VP8,
).expect("Could not create video src pad template")
};
}

pub struct ServoMediaStreamSrc {
cat: gst::DebugCategory,
proxysrc: gst::Element,
srcpad: gst::GhostPad,
audio_proxysrc: gst::Element,
audio_srcpad: gst::GhostPad,
video_proxysrc: gst::Element,
video_srcpad: gst::GhostPad,
flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
has_audio_stream: Arc<AtomicBool>,
has_video_stream: Arc<AtomicBool>,
}

impl ServoMediaStreamSrc {
pub fn set_stream(&self, stream: &mut GStreamerMediaStream) {
pub fn set_stream(
&self,
stream: &mut GStreamerMediaStream,
src: &gst::Element,
only_stream: bool,
) {
// XXXferjm the current design limits the number of streams to one
// per type. This fulfills the basic use case for WebRTC, but we should
// implement support for multiple streams per type at some point, which
// likely involves encoding and muxing all streams of the same type
// in a single stream.

gst_log!(self.cat, "Setting stream");

// Append a proxysink to the media stream pipeline.
@@ -29,13 +68,79 @@ mod imp {
pipeline.add(&sink).unwrap();
gst::Element::link_many(&[&last_element, &sink][..]).unwrap();

// Connect the media stream proxysink to the source's proxysrc.
self.proxysrc.set_property("proxysink", &sink).unwrap();
// Create the appropriate proxysrc depending on the stream type
// and connect the media stream proxysink to it.
self.setup_proxy_src(stream.ty(), &sink, src, only_stream);

sink.sync_state_with_parent().unwrap();

pipeline.set_state(gst::State::Playing).unwrap();
}

fn setup_proxy_src(
&self,
stream_type: MediaStreamType,
sink: &gst::Element,
src: &gst::Element,
only_stream: bool,
) {
let (proxysrc, src_pad, no_more_pads) = match stream_type {
MediaStreamType::Audio => {
self.has_audio_stream.store(true, Ordering::Relaxed);
(
&self.audio_proxysrc,
&self.audio_srcpad,
self.has_video_stream.load(Ordering::Relaxed),
)
}
MediaStreamType::Video => {
self.has_video_stream.store(true, Ordering::Relaxed);
(
&self.video_proxysrc,
&self.video_srcpad,
self.has_audio_stream.load(Ordering::Relaxed),
)
}
};
proxysrc
.set_property("proxysink", &sink)
.expect("Could not set proxysink property on proxysrc");

// Add proxysrc to bin
let bin = src.downcast_ref::<gst::Bin>().unwrap();
bin.add(proxysrc)
.expect("Could not add proxysrc element to bin");

let target_pad = proxysrc
.get_static_pad("src")
.expect("Could not get proxysrc's static src pad");
src_pad
.set_target(&target_pad)
.expect("Could not set target pad");

src.add_pad(src_pad)
.expect("Could not add source pad to media stream src");
::set_element_flags(src, gst::ElementFlags::SOURCE);

let proxy_pad = src_pad.get_internal().unwrap();
src_pad.set_active(true).expect("Could not active pad");
self.flow_combiner.lock().unwrap().add_pad(&proxy_pad);
let combiner = self.flow_combiner.clone();
proxy_pad.set_chain_function(move |pad, parent, buffer| {
let chain_result = gst::ProxyPad::chain_default(pad, parent.as_ref(), buffer);
let result = combiner.lock().unwrap().update_pad_flow(pad, chain_result);
if result == Err(gst::FlowError::Flushing) {
return chain_result;
}
result
});

src.sync_state_with_parent().unwrap();

if no_more_pads || only_stream {
src.no_more_pads();
}
}
}

// Basic declaration of our type for the GObject type system.
@@ -49,22 +154,32 @@ mod imp {

// Called once at the very beginning of instantiation of each instance and
// creates the data structure that contains all our state
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let proxysrc = gst::ElementFactory::make("proxysrc", None)
fn new_with_class(_: &subclass::simple::ClassStruct<Self>) -> Self {
let audio_proxysrc = gst::ElementFactory::make("proxysrc", None)
.expect("Could not create proxysrc element");
let audio_srcpad =
gst::GhostPad::new_no_target_from_template("audio_src", &AUDIO_SRC_PAD_TEMPLATE)
.unwrap();

let pad_templ = klass.get_pad_template("src").unwrap();
let srcpad =
gst::GhostPad::new_no_target_from_template("stream_src", &pad_templ).unwrap();
let video_proxysrc = gst::ElementFactory::make("proxysrc", None)
.expect("Could not create proxysrc element");
let video_srcpad =
gst::GhostPad::new_no_target_from_template("video_src", &VIDEO_SRC_PAD_TEMPLATE)
.unwrap();

Self {
cat: gst::DebugCategory::new(
"servomediastreamsrc",
gst::DebugColorFlags::empty(),
"Servo media stream source",
),
proxysrc,
srcpad,
audio_proxysrc,
audio_srcpad,
video_proxysrc,
video_srcpad,
flow_combiner: Arc::new(Mutex::new(UniqueFlowCombiner::new())),
has_video_stream: Arc::new(AtomicBool::new(false)),
has_audio_stream: Arc::new(AtomicBool::new(false)),
}
}

@@ -85,15 +200,23 @@ mod imp {
"Servo developers",
);

let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
// Let playbin3 know we are a live source.
klass.install_properties(&[subclass::Property("is-live", |name| {
glib::ParamSpec::boolean(
name,
"Is Live",
"Let playbin3 know we are a live source",
true,
glib::ParamFlags::READWRITE,
)
})]);

// Add pad templates for our audio and video source pads.
// These are later used for actually creating the pads and beforehand
// already provide information to GStreamer about all possible
// pads that could exist for this type.
klass.add_pad_template(AUDIO_SRC_PAD_TEMPLATE.clone());
klass.add_pad_template(VIDEO_SRC_PAD_TEMPLATE.clone());
}
}

@@ -106,29 +229,12 @@ mod imp {
impl ObjectImpl for ServoMediaStreamSrc {
glib_object_impl!();

// Called right after construction of a new instance
fn constructed(&self, obj: &glib::Object) {
// Call the parent class' ::constructed() implementation first
self.parent_constructed(obj);

let bin = obj.downcast_ref::<gst::Bin>().unwrap();

// Add audio proxy sink and source pad to bin.
bin.add(&self.proxysrc)
.expect("Could not add proxysrc element to bin");

let target_pad = self
.proxysrc
.get_static_pad("src")
.expect("Could not get source pad");
self.srcpad.set_target(&target_pad).unwrap();

let element = obj.downcast_ref::<gst::Element>().unwrap();
element
.add_pad(&self.srcpad)
.expect("Could not add source pad to bin");

::set_element_flags(element, gst::ElementFlags::SOURCE);
fn get_property(&self, _: &glib::Object, id: usize) -> Result<gst::Value, ()> {
// We have a single property: is-live
if id != 0 {
return Err(());
}
Ok(true.to_value())
}
}

@@ -186,16 +292,14 @@ glib_wrapper! {
unsafe impl Send for ServoMediaStreamSrc {}
unsafe impl Sync for ServoMediaStreamSrc {}

macro_rules! inner_proxy {
($fn_name:ident, $arg1:ident, $arg1_type:ty) => (
pub fn $fn_name(&self, $arg1: $arg1_type) {
imp::ServoMediaStreamSrc::from_instance(self).$fn_name($arg1)
}
)
}

impl ServoMediaStreamSrc {
inner_proxy!(set_stream, stream, &mut GStreamerMediaStream);
pub fn set_stream(&self, stream: &mut GStreamerMediaStream, only_stream: bool) {
imp::ServoMediaStreamSrc::from_instance(self).set_stream(
stream,
self.upcast_ref::<gst::Element>(),
only_stream,
)
}
}

// Registers the type for our element, and then registers in GStreamer
@@ -256,7 +256,7 @@ impl PlayerInner {
Ok(result)
}

fn set_stream(&mut self, stream: &MediaStreamId) -> Result<(), PlayerError> {
fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
debug_assert!(self.stream_type == StreamType::Stream);
if let Some(ref source) = self.source {
if let PlayerSource::Stream(source) = source {
@@ -275,7 +275,7 @@ impl PlayerInner {
playbin.set_start_time(gst::ClockTime::none());
playbin.use_clock(Some(&clock));

source.set_stream(&mut stream);
source.set_stream(&mut stream, only_stream);
return Ok(());
}
}
@@ -712,7 +712,6 @@ impl Player for GStreamerPlayer {
inner_player_proxy!(seek, time, f64);
inner_player_proxy!(set_volume, value, f64);
inner_player_proxy!(buffered, Vec<Range<f64>>);
inner_player_proxy!(set_stream, stream, &MediaStreamId);

fn shutdown(&self) -> Result<(), PlayerError> {
self.stop()
@@ -721,4 +720,11 @@ impl Player for GStreamerPlayer {
fn render_use_gl(&self) -> bool {
self.render.lock().unwrap().is_gl()
}

fn set_stream(&self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
self.setup()?;
let inner = self.inner.borrow();
let mut inner = inner.as_ref().unwrap().lock().unwrap();
inner.set_stream(stream, only_stream)
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.