From afd0574f1abea3a576a40bfca7b926b8aa02214d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Wed, 24 Jul 2019 16:24:39 +0200 Subject: [PATCH 1/4] Cache size setup while seeking --- backends/gstreamer/source.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/backends/gstreamer/source.rs b/backends/gstreamer/source.rs index 2fcc2d65..4282b912 100644 --- a/backends/gstreamer/source.rs +++ b/backends/gstreamer/source.rs @@ -55,10 +55,19 @@ mod imp { srcpad: gst::GhostPad, position: Mutex, seeking: AtomicBool, + size: Mutex>, } impl ServoSrc { pub fn set_size(&self, size: i64) { + if self.seeking.load(Ordering::Relaxed) { + // We ignore set_size requests if we are seeking. + // The size value is temporarily stored so it + // is properly set once we are done seeking. + *self.size.lock().unwrap() = Some(size); + return; + } + if self.appsrc.get_size() == -1 { self.appsrc.set_size(size); } @@ -86,6 +95,12 @@ mod imp { pub fn set_seek_done(&self) { self.seeking.store(false, Ordering::Relaxed); + if let Some(size) = self.size.lock().unwrap().take() { + if self.appsrc.get_size() == -1 { + self.appsrc.set_size(size); + } + } + let mut pos = self.position.lock().unwrap(); pos.offset = pos.requested_offset; pos.requested_offset = 0; @@ -267,6 +282,7 @@ mod imp { srcpad: ghost_pad, position: Mutex::new(Default::default()), seeking: AtomicBool::new(false), + size: Mutex::new(None), } } From 2c81770fb0c2608d62502b9b3eaae888594c43bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Thu, 25 Jul 2019 13:10:05 +0200 Subject: [PATCH 2/4] Fix race condition with seek lock by enforcing an ack --- backends/gstreamer/player.rs | 22 ++++++++++++++-------- examples/player/app.rs | 7 +++---- examples/simple_player.rs | 4 ++-- player/lib.rs | 22 +++++++++++++++++++--- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/backends/gstreamer/player.rs b/backends/gstreamer/player.rs index b1520a5f..182990ff 100644 --- a/backends/gstreamer/player.rs +++ b/backends/gstreamer/player.rs @@ -13,7 +13,7 @@ use render::GStreamerRender; use servo_media_player::context::PlayerGLContext; use servo_media_player::frame::FrameRenderer; use servo_media_player::metadata::Metadata; -use servo_media_player::{PlaybackState, Player, PlayerError, PlayerEvent, StreamType}; +use servo_media_player::{PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType}; use servo_media_streams::registry::{get_stream, MediaStreamId}; use servo_media_traits::Muteable; use source::{register_servo_src, ServoSrc}; @@ -298,21 +298,26 @@ macro_rules! player( ); struct SeekChannel { - sender: IpcSender, - recv: IpcReceiver, + sender: SeekLock, + recv: IpcReceiver, } impl SeekChannel { fn new() -> Self { - let (sender, recv) = channel::().expect("Couldn't create IPC channel"); - Self { sender, recv } + let (sender, recv) = channel::().expect("Couldn't create IPC channel"); + Self { + sender: SeekLock{ + lock_channel: sender, + }, + recv, + } } - fn sender(&self) -> IpcSender { + fn sender(&self) -> SeekLock { self.sender.clone() } - fn await(&self) -> bool { + fn await(&self) -> SeekLockMsg { self.recv.recv().unwrap() } } @@ -664,7 +669,8 @@ impl GStreamerPlayer { seek_channel.lock().unwrap().sender() ) ); - let ret = seek_channel.lock().unwrap().await(); + let (ret, ack_channel) = seek_channel.lock().unwrap().await(); + ack_channel.send(()).unwrap(); ret } else { true diff --git a/examples/player/app.rs b/examples/player/app.rs index 7e0e7447..5f9ea149 100644 --- a/examples/player/app.rs +++ b/examples/player/app.rs @@ -389,14 +389,13 @@ pub fn main_loop(mut app: App) -> Result (), } } - player::PlayerEvent::SeekData(offset, sender) => { + player::PlayerEvent::SeekData(offset, seek_lock) => { input_eos = false; - let ret = if let Ok(pos) = buf_reader.seek(SeekFrom::Start(offset)) { + seek_lock.unlock(if let Ok(pos) = buf_reader.seek(SeekFrom::Start(offset)) { offset == pos } else { false - }; - sender.send(ret).unwrap(); + }); } player::PlayerEvent::NeedData => { if !input_eos { diff --git a/examples/simple_player.rs b/examples/simple_player.rs index c1915766..7baedd78 100644 --- a/examples/simple_player.rs +++ b/examples/simple_player.rs @@ -142,10 +142,10 @@ fn run_example(servo_media: Arc) { } } } - PlayerEvent::SeekData(p, sender) => { + PlayerEvent::SeekData(p, seek_lock) => { println!("\nSeek requested to position {:?}", p); seek_sender.send(p).unwrap(); - sender.send(true).unwrap(); + seek_lock.unlock(true); } PlayerEvent::SeekDone(p) => println!("\nSeeked to {:?}", p), PlayerEvent::NeedData => println!("\nNeedData"), diff --git a/player/lib.rs b/player/lib.rs index da144caa..f8f9ed2b 100644 --- a/player/lib.rs +++ b/player/lib.rs @@ -8,7 +8,7 @@ pub mod context; pub mod frame; pub mod metadata; -use ipc_channel::ipc::IpcSender; +use ipc_channel::ipc::{self, IpcSender}; use servo_media_traits::Muteable; use std::ops::Range; use streams::registry::MediaStreamId; @@ -40,6 +40,21 @@ pub enum PlayerError { SetStreamFailed, } +pub type SeekLockMsg = (bool, IpcSender<()>); + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct SeekLock { + pub lock_channel: IpcSender, +} + +impl SeekLock { + pub fn unlock(&self, result: bool) { + let (ack_sender, ack_recv) = ipc::channel::<()>().expect("Could not create IPC channel"); + self.lock_channel.send((result, ack_sender)).unwrap(); + ack_recv.recv().unwrap() + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub enum PlayerEvent { EndOfStream, @@ -54,9 +69,10 @@ pub enum PlayerEvent { PositionChanged(u64), /// The player needs the data to perform a seek to the given offset. /// The next push_data should get the buffers from the new offset. - /// The player will be blocked until the user sends, through the IPC sender, + /// The player will be blocked until the user unlocks it through + /// the given SeekLock instance. /// This event is only received for seekable stream types. - SeekData(u64, IpcSender), + SeekData(u64, SeekLock), /// The player has performed a seek to the given offset. SeekDone(u64), StateChanged(PlaybackState), From 9bfbd1f7876d3e5650d397d369f1a0fa3b3f280f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Thu, 25 Jul 2019 13:15:28 +0200 Subject: [PATCH 3/4] rustfmt and fix warnings --- backends/gstreamer/player.rs | 9 ++++++--- backends/gstreamer/render.rs | 2 +- backends/gstreamer/webrtc.rs | 3 ++- examples/player/context.rs | 1 + streams/lib.rs | 2 +- webrtc/lib.rs | 2 +- webrtc/thread.rs | 5 ++++- 7 files changed, 16 insertions(+), 8 deletions(-) diff --git a/backends/gstreamer/player.rs b/backends/gstreamer/player.rs index 182990ff..5b7b32c3 100644 --- a/backends/gstreamer/player.rs +++ b/backends/gstreamer/player.rs @@ -13,7 +13,9 @@ use render::GStreamerRender; use servo_media_player::context::PlayerGLContext; use servo_media_player::frame::FrameRenderer; use servo_media_player::metadata::Metadata; -use servo_media_player::{PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType}; +use servo_media_player::{ + PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType, +}; use servo_media_streams::registry::{get_stream, MediaStreamId}; use servo_media_traits::Muteable; use source::{register_servo_src, ServoSrc}; @@ -306,7 +308,7 @@ impl SeekChannel { fn new() -> Self { let (sender, recv) = channel::().expect("Couldn't create IPC channel"); Self { - sender: SeekLock{ + sender: SeekLock { lock_channel: sender, }, recv, @@ -669,7 +671,8 @@ impl GStreamerPlayer { seek_channel.lock().unwrap().sender() ) ); - let (ret, ack_channel) = seek_channel.lock().unwrap().await(); + let (ret, ack_channel) = + seek_channel.lock().unwrap().await(); ack_channel.send(()).unwrap(); ret } else { diff --git a/backends/gstreamer/render.rs b/backends/gstreamer/render.rs index 835e4f94..151b13dd 100644 --- a/backends/gstreamer/render.rs +++ b/backends/gstreamer/render.rs @@ -44,7 +44,7 @@ mod platform { pub struct RenderDummy(); pub type Render = RenderDummy; - pub fn create_render(_: Box) -> Option { + pub fn create_render(_: Box) -> Option { None } diff --git a/backends/gstreamer/webrtc.rs b/backends/gstreamer/webrtc.rs index 8cdb3fe2..a67ca4d5 100644 --- a/backends/gstreamer/webrtc.rs +++ b/backends/gstreamer/webrtc.rs @@ -310,7 +310,8 @@ impl GStreamerWebRtcController { let mut caps = gst::Caps::new_empty(); let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned"); for format in media.formats() { - let pt = format.parse() + let pt = format + .parse() .expect("Gstreamer provided noninteger format"); caps_mut.append( media diff --git a/examples/player/context.rs b/examples/player/context.rs index 5d692d50..1d76e66d 100644 --- a/examples/player/context.rs +++ b/examples/player/context.rs @@ -10,6 +10,7 @@ pub struct PlayerContextGlutin { gl_api: GlApi, } +#[allow(unused_variables)] impl PlayerContextGlutin { pub fn new( use_gl: bool, diff --git a/streams/lib.rs b/streams/lib.rs index f051ee46..d5f8f0b1 100644 --- a/streams/lib.rs +++ b/streams/lib.rs @@ -22,5 +22,5 @@ pub trait MediaOutput: Send { #[derive(Clone, Copy, Debug, PartialEq)] pub enum MediaStreamType { Video, - Audio + Audio, } diff --git a/webrtc/lib.rs b/webrtc/lib.rs index f1eb97c9..14368ab3 100644 --- a/webrtc/lib.rs +++ b/webrtc/lib.rs @@ -1,8 +1,8 @@ extern crate boxfnonce; extern crate log; extern crate servo_media_streams; -use servo_media_streams::MediaStreamType; use servo_media_streams::registry::MediaStreamId; +use servo_media_streams::MediaStreamType; use std::fmt::Display; use std::str::FromStr; diff --git a/webrtc/thread.rs b/webrtc/thread.rs index 9e08b479..ae0b4e69 100644 --- a/webrtc/thread.rs +++ b/webrtc/thread.rs @@ -107,7 +107,10 @@ pub enum InternalEvent { UpdateIceConnectionState, } -pub fn handle_rtc_event(controller: &mut dyn WebRtcControllerBackend, event: RtcThreadEvent) -> bool { +pub fn handle_rtc_event( + controller: &mut dyn WebRtcControllerBackend, + event: RtcThreadEvent, +) -> bool { let result = match event { RtcThreadEvent::ConfigureStun(server, policy) => controller.configure(&server, policy), RtcThreadEvent::SetRemoteDescription(desc, cb) => { From 5d24b6f25cce7f88b5ea86657dfb640a0968906d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Thu, 25 Jul 2019 18:49:20 +0200 Subject: [PATCH 4/4] Move seek unlock ack after set_seek_done call --- backends/gstreamer/player.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/backends/gstreamer/player.rs b/backends/gstreamer/player.rs index 5b7b32c3..c4588b7a 100644 --- a/backends/gstreamer/player.rs +++ b/backends/gstreamer/player.rs @@ -663,7 +663,7 @@ impl GStreamerPlayer { notify!(observer__, PlayerEvent::EnoughData); }) .seek_data(move |_, offset| { - let ret = if servosrc_.set_seek_offset(offset) { + let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) { notify!( observer___, PlayerEvent::SeekData( @@ -673,13 +673,15 @@ impl GStreamerPlayer { ); let (ret, ack_channel) = seek_channel.lock().unwrap().await(); - ack_channel.send(()).unwrap(); - ret + (ret, Some(ack_channel)) } else { - true + (true, None) }; servosrc_.set_seek_done(); + if let Some(ack_channel) = ack_channel { + ack_channel.send(()).unwrap(); + } ret }) .build(),