Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consuming stream data from a client app #372

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backends/gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "lib.rs"

[dependencies]
boxfnonce = "0.1.0"
euclid = "0.20"
mime = "0.3.13"
log = "0.4"

Expand Down
1 change: 1 addition & 0 deletions backends/gstreamer/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(nll)]
extern crate boxfnonce;
extern crate byte_slice_cast;
extern crate euclid;
extern crate mime;

extern crate glib_sys as glib_ffi;
Expand Down
21 changes: 12 additions & 9 deletions backends/gstreamer/media_capture.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::media_stream::GStreamerMediaStream;

use gst;
use gst::prelude::*;
use servo_media_streams::capture::*;
Expand Down Expand Up @@ -155,17 +156,19 @@ fn create_input_stream(
let devices = GstMediaDevices::new();
devices
.get_track(stream_type == MediaStreamType::Video, constraint_set)
.map(|track| {
let f = match stream_type {
MediaStreamType::Audio => GStreamerMediaStream::create_audio_from,
MediaStreamType::Video => GStreamerMediaStream::create_video_from,
};
f(match source {
.map(|track| match stream_type {
MediaStreamType::Audio => GStreamerMediaStream::create_audio_from(match source {
MediaSource::Device => track.element,
MediaSource::App => {
gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed")
MediaSource::App(_) => unimplemented!(),
}),
MediaStreamType::Video => match source {
MediaSource::Device => GStreamerMediaStream::create_video_from(track.element, None),
MediaSource::App(size) => {
let appsrc =
gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed");
GStreamerMediaStream::create_video_from(appsrc, Some(size))
}
})
},
})
}

Expand Down
92 changes: 79 additions & 13 deletions backends/gstreamer/media_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::BACKEND_BASE_TIME;

use euclid::default::Size2D;
use glib::prelude::*;
use gst;
use gst::prelude::*;
Expand Down Expand Up @@ -30,6 +32,7 @@ pub struct GStreamerMediaStream {
type_: MediaStreamType,
elements: Vec<gst::Element>,
pipeline: Option<gst::Pipeline>,
video_app_source: Option<AppSrc>,
}

impl MediaStream for GStreamerMediaStream {
Expand All @@ -50,12 +53,10 @@ impl MediaStream for GStreamerMediaStream {
}

fn push_data(&self, data: Vec<u8>) {
if let Some(source) = self.elements.last() {
if let Some(appsrc) = source.downcast_ref::<AppSrc>() {
let buffer = gst::Buffer::from_slice(data);
if let Err(error) = appsrc.push_buffer(buffer) {
warn!("{}", error);
}
if let Some(ref appsrc) = self.video_app_source {
let buffer = gst::Buffer::from_slice(data);
if let Err(error) = appsrc.push_buffer(buffer) {
warn!("{}", error);
}
}
}
Expand All @@ -68,6 +69,7 @@ impl GStreamerMediaStream {
type_,
elements,
pipeline: None,
video_app_source: None,
}
}

Expand Down Expand Up @@ -103,6 +105,10 @@ impl GStreamerMediaStream {
self.elements.last().unwrap().clone()
}

pub fn first_element(&self) -> gst::Element {
self.elements.first().unwrap().clone()
}

pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) {
assert!(self.pipeline.is_none());
let elements: Vec<_> = self.elements.iter().collect();
Expand Down Expand Up @@ -135,7 +141,7 @@ impl GStreamerMediaStream {
.set_property("is-live", &true)
.expect("videotestsrc doesn't have expected 'is-live' property");

Self::create_video_from(videotestsrc)
Self::create_video_from(videotestsrc, None)
}

/// Attaches encoding adapters to the stream, returning the source element
Expand Down Expand Up @@ -186,14 +192,74 @@ impl GStreamerMediaStream {
}
}

pub fn create_video_from(source: gst::Element) -> MediaStreamId {
pub fn set_video_app_source(&mut self, source: &AppSrc) {
self.video_app_source = Some(source.clone());
}

pub fn create_video_from(source: gst::Element, size: Option<Size2D<u32>>) -> MediaStreamId {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be doing decoding within create_video_from, it's the responsibility of the client to do that (see webrtc, all media streams are raw, this was a deliberate choice and without it we will have problems)

let src = gst::ElementFactory::make("proxysrc", None).unwrap();
let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap();
let queue = gst::ElementFactory::make("queue", None).unwrap();

register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
let stream = Arc::new(Mutex::new(GStreamerMediaStream::new(
MediaStreamType::Video,
vec![source, videoconvert, queue],
))))
vec![src, videoconvert, queue],
)));

let pipeline = gst::Pipeline::new(Some("video pipeline"));
let clock = gst::SystemClock::obtain();
pipeline.set_start_time(gst::ClockTime::none());
pipeline.set_base_time(*BACKEND_BASE_TIME);
pipeline.use_clock(Some(&clock));

let decodebin = gst::ElementFactory::make("decodebin", None).unwrap();

let stream_ = stream.clone();
let video_pipeline = pipeline.clone();
decodebin.connect_pad_added(move |decodebin, _| {
// Append a proxysink to the video pipeline.
let proxy_sink = gst::ElementFactory::make("proxysink", None).unwrap();
video_pipeline.add(&proxy_sink).unwrap();
gst::Element::link_many(&[decodebin, &proxy_sink]).unwrap();

// And connect the video and media stream pipelines.
let stream = stream_.lock().unwrap();
let first_element = stream.first_element();
first_element
.set_property("proxysink", &proxy_sink)
.unwrap();

proxy_sink.sync_state_with_parent().unwrap();
decodebin.sync_state_with_parent().unwrap();
});

if let Some(size) = size {
let caps = gst::Caps::builder("video/x-raw")
.field("format", &gst_video::VideoFormat::Bgra.to_string())
.field("pixel-aspect-ratio", &gst::Fraction::from((1, 1)))
.field("width", &(size.width as i32))
.field("height", &(size.height as i32))
.build();
source
.set_property("caps", &caps)
.expect("source doesn't have expected 'caps' property");
}

if let Some(appsrc) = source.downcast_ref::<AppSrc>() {
appsrc.set_property_format(gst::Format::Time);
stream.lock().unwrap().set_video_app_source(appsrc);
}

pipeline.add_many(&[&source, &decodebin]).unwrap();
gst::Element::link_many(&[&source, &decodebin]).unwrap();

pipeline.set_state(gst::State::Playing).unwrap();

#[cfg(debug_assertions)]
pipeline
.upcast::<gst::Bin>()
.debug_to_dot_file(gst::DebugGraphDetails::all(), "VideoPipeline_PLAYING");

register_stream(stream)
}

pub fn create_audio() -> MediaStreamId {
Expand Down Expand Up @@ -224,7 +290,7 @@ impl GStreamerMediaStream {
proxy_src.set_property("proxysink", &proxy_sink).unwrap();
let stream = match ty {
MediaStreamType::Audio => Self::create_audio_from(proxy_src),
MediaStreamType::Video => Self::create_video_from(proxy_src),
MediaStreamType::Video => Self::create_video_from(proxy_src, None),
};

(stream, GstreamerMediaSocket { proxy_sink })
Expand Down
5 changes: 5 additions & 0 deletions backends/gstreamer/media_stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ mod imp {
sink.sync_state_with_parent().unwrap();

pipeline.set_state(gst::State::Playing).unwrap();

#[cfg(debug_assertions)]
pipeline
.upcast::<gst::Bin>()
.debug_to_dot_file(gst::DebugGraphDetails::all(), "ServoMediaStreamSrc_PLAYING");
}

fn setup_proxy_src(
Expand Down
2 changes: 1 addition & 1 deletion backends/gstreamer/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ fn on_incoming_decodebin_stream(

let (stream, ty) = if name == "video" {
(
GStreamerMediaStream::create_video_from(proxy_src),
GStreamerMediaStream::create_video_from(proxy_src, None),
MediaStreamType::Video,
)
} else {
Expand Down
1 change: 1 addition & 0 deletions streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ name = "servo_media_streams"
path = "lib.rs"

[dependencies]
euclid = "0.20"
lazy_static = "1.0"
serde = "1.0.66"
uuid = { version = "0.8", features = ["v4", "serde"] }
4 changes: 3 additions & 1 deletion streams/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
extern crate euclid;
#[macro_use]
extern crate lazy_static;
#[macro_use]
Expand All @@ -7,6 +8,7 @@ pub mod capture;
pub mod device_monitor;
pub mod registry;

use euclid::default::Size2D;
use std::any::Any;

pub use registry::*;
Expand All @@ -32,7 +34,7 @@ pub enum MediaSource {
Device,
// The media stream source is the client application.
// i.e. captureStream
App,
App(Size2D<u32>),
}

/// This isn't part of the webrtc spec; it's a leaky abstaction while media streams
Expand Down