Skip to content

Commit

Permalink
Auto merge of #279 - ferjm:muteables.shutdown, r=ceyusa
Browse files Browse the repository at this point in the history
Remove shutdown_player and shutdown_audio_context

This is a cleaner approach that does not require consumers of the API to call any shutdown* method to ensure that the Muteables are removed appropriately.
  • Loading branch information
bors-servo committed Aug 14, 2019
2 parents 3e3dc52 + b1ae3b3 commit a70f024
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 109 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -14,4 +14,4 @@ members = [
"streams",
"webrtc"
]
license = "MPL-2.0"

26 changes: 21 additions & 5 deletions audio/context.rs
Expand Up @@ -3,11 +3,11 @@ use graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
use node::{AudioNodeInit, AudioNodeMessage, ChannelInfo};
use render_thread::AudioRenderThread;
use render_thread::AudioRenderThreadMsg;
use servo_media_traits::Muteable;
use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
use sink::AudioSink;
use std::cell::Cell;
use std::sync::mpsc::{self, Sender};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::Builder;
use AudioBackend;

Expand Down Expand Up @@ -105,8 +105,12 @@ impl Default for AudioContextOptions {

/// Representation of an audio context on the control thread.
pub struct AudioContext {
/// ID for comparisons
/// Media instance ID.
id: usize,
/// Client context ID.
client_context_id: ClientContextId,
/// Owner backend communication channel.
backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
/// Rendering thread communication channel.
sender: Sender<AudioRenderThreadMsg>,
/// State of the audio context on the control thread.
Expand All @@ -122,7 +126,12 @@ pub struct AudioContext {

impl AudioContext {
/// Constructs a new audio context.
pub fn new<B: AudioBackend>(id: usize, options: AudioContextOptions) -> Self {
pub fn new<B: AudioBackend>(
id: usize,
client_context_id: &ClientContextId,
backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
options: AudioContextOptions,
) -> Self {
let (sample_rate, channels) = match options {
AudioContextOptions::RealTimeAudioContext(ref options) => (options.sample_rate, 2),
AudioContextOptions::OfflineAudioContext(ref options) => {
Expand Down Expand Up @@ -150,6 +159,8 @@ impl AudioContext {
.unwrap();
Self {
id,
client_context_id: *client_context_id,
backend_chan,
sender,
state: Cell::new(ProcessingState::Suspended),
sample_rate,
Expand Down Expand Up @@ -289,10 +300,15 @@ impl Drop for AudioContext {
fn drop(&mut self) {
let (tx, _) = mpsc::channel();
let _ = self.sender.send(AudioRenderThreadMsg::Close(tx));
let _ = self
.backend_chan
.lock()
.unwrap()
.send(BackendMsg::Shutdown(self.client_context_id, self.id));
}
}

impl Muteable for AudioContext {
impl MediaInstance for AudioContext {
fn get_id(&self) -> usize {
self.id
}
Expand Down
28 changes: 11 additions & 17 deletions backends/dummy/lib.rs
Expand Up @@ -21,14 +21,14 @@ use servo_media_player::{frame, Player, PlayerError, PlayerEvent, StreamType};
use servo_media_streams::capture::MediaTrackConstraintSet;
use servo_media_streams::registry::{register_stream, unregister_stream, MediaStreamId};
use servo_media_streams::{MediaOutput, MediaStream, MediaStreamType};
use servo_media_traits::{ClientContextId, Muteable};
use servo_media_traits::{ClientContextId, MediaInstance};
use servo_media_webrtc::{
thread, BundlePolicy, IceCandidate, SessionDescription, WebRtcBackend, WebRtcController,
WebRtcControllerBackend, WebRtcSignaller, WebrtcResult,
};
use std::any::Any;
use std::ops::Range;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex};

pub struct DummyBackend;
Expand Down Expand Up @@ -79,21 +79,19 @@ impl Backend for DummyBackend {
Arc::new(Mutex::new(DummyPlayer))
}

fn shutdown_player(&self, _id: &ClientContextId, _player: Arc<Mutex<dyn Player>>) {}

fn create_audio_context(
&self,
_id: &ClientContextId,
options: AudioContextOptions,
) -> Arc<Mutex<AudioContext>> {
Arc::new(Mutex::new(AudioContext::new::<Self>(0, options)))
}

fn shutdown_audio_context(
&self,
_id: &ClientContextId,
_audio_context: Arc<Mutex<AudioContext>>,
) {
let (sender, _) = mpsc::channel();
let sender = Arc::new(Mutex::new(sender));
Arc::new(Mutex::new(AudioContext::new::<Self>(
0,
&ClientContextId::build(1, 1),
sender,
options,
)))
}

fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
Expand Down Expand Up @@ -156,10 +154,6 @@ impl Player for DummyPlayer {
Ok(vec![])
}

fn shutdown(&self) -> Result<(), PlayerError> {
Ok(())
}

fn set_stream(&self, _: &MediaStreamId, _: bool) -> Result<(), PlayerError> {
Ok(())
}
Expand Down Expand Up @@ -273,7 +267,7 @@ impl WebRtcControllerBackend for DummyWebRtcController {
fn quit(&mut self) {}
}

impl Muteable for DummyPlayer {
impl MediaInstance for DummyPlayer {
fn get_id(&self) -> usize {
0
}
Expand Down
98 changes: 51 additions & 47 deletions backends/gstreamer/lib.rs
Expand Up @@ -61,12 +61,14 @@ use servo_media_player::{Player, PlayerEvent, StreamType};
use servo_media_streams::capture::MediaTrackConstraintSet;
use servo_media_streams::registry::MediaStreamId;
use servo_media_streams::MediaOutput;
use servo_media_traits::{ClientContextId, Muteable};
use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::vec::Vec;

lazy_static! {
Expand All @@ -75,8 +77,10 @@ lazy_static! {

pub struct GStreamerBackend {
capture_mocking: AtomicBool,
muteables: Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn Muteable>>)>>>,
instances: Arc<Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>>,
next_muteable_id: AtomicUsize,
/// Channel to communicate media instances with its owner Backend.
backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -106,80 +110,80 @@ impl GStreamerBackend {
return Err(ErrorLoadingPlugins(errors));
}

let instances: Arc<
Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>,
> = Arc::new(Mutex::new(HashMap::new()));

let instances_ = instances.clone();
let (backend_chan, recvr) = mpsc::channel();
thread::Builder::new()
.name("GStreamerBackend ShutdownThread".to_owned())
.spawn(move || {
match recvr.recv().unwrap() {
BackendMsg::Shutdown(context_id, muteable_id) => {
if let Some(vec) = instances_.lock().unwrap().get_mut(&context_id) {
vec.retain(|m| m.0 != muteable_id);
if vec.is_empty() {
instances_.lock().unwrap().remove(&context_id);
}
}
}
};
})
.unwrap();

Ok(Box::new(GStreamerBackend {
capture_mocking: AtomicBool::new(false),
muteables: Mutex::new(HashMap::new()),
instances,
next_muteable_id: AtomicUsize::new(0),
backend_chan: Arc::new(Mutex::new(backend_chan)),
}))
}

fn remove_muteable(&self, id: &ClientContextId, muteable_id: usize) {
let mut muteables = self.muteables.lock().unwrap();
if let Some(vec) = muteables.get_mut(&id) {
vec.retain(|m| m.0 != muteable_id);
if vec.len() == 0 {
muteables.remove(&id);
}
}
}
}

impl Backend for GStreamerBackend {
fn create_player(
&self,
id: &ClientContextId,
context_id: &ClientContextId,
stream_type: StreamType,
sender: IpcSender<PlayerEvent>,
renderer: Option<Arc<Mutex<dyn FrameRenderer>>>,
gl_context: Box<dyn PlayerGLContext>,
) -> Arc<Mutex<dyn Player>> {
let muteable_id = self.next_muteable_id.fetch_add(1, Ordering::Relaxed);
let id = self.next_muteable_id.fetch_add(1, Ordering::Relaxed);
let player = Arc::new(Mutex::new(player::GStreamerPlayer::new(
muteable_id,
id,
context_id,
self.backend_chan.clone(),
stream_type,
sender,
renderer,
gl_context,
)));
let mut muteables = self.muteables.lock().unwrap();
let entry = muteables.entry(*id).or_insert(Vec::new());
entry.push((muteable_id, Arc::downgrade(&player).clone()));
let mut instances = self.instances.lock().unwrap();
let entry = instances.entry(*context_id).or_insert(Vec::new());
entry.push((id, Arc::downgrade(&player).clone()));
player
}

fn shutdown_player(&self, id: &ClientContextId, player: Arc<Mutex<dyn Player>>) {
let player = player.lock().unwrap();
let p_id = player.get_id();
self.remove_muteable(id, p_id);

if let Err(e) = player.shutdown() {
warn!("Player was shut down with err: {:?}", e);
}
}

fn create_audio_context(
&self,
id: &ClientContextId,
client_context_id: &ClientContextId,
options: AudioContextOptions,
) -> Arc<Mutex<AudioContext>> {
let muteable_id = self.next_muteable_id.fetch_add(1, Ordering::Relaxed);
let context = Arc::new(Mutex::new(AudioContext::new::<Self>(muteable_id, options)));
let mut muteables = self.muteables.lock().unwrap();
let entry = muteables.entry(*id).or_insert(Vec::new());
entry.push((muteable_id, Arc::downgrade(&context).clone()));
let id = self.next_muteable_id.fetch_add(1, Ordering::Relaxed);
let context = Arc::new(Mutex::new(AudioContext::new::<Self>(
id,
client_context_id,
self.backend_chan.clone(),
options,
)));
let mut instances = self.instances.lock().unwrap();
let entry = instances.entry(*client_context_id).or_insert(Vec::new());
entry.push((id, Arc::downgrade(&context).clone()));
context
}

fn shutdown_audio_context(
&self,
id: &ClientContextId,
audio_context: Arc<Mutex<AudioContext>>,
) {
let audio_context = audio_context.lock().unwrap();
let ac_id = audio_context.get_id();
self.remove_muteable(id, ac_id);
}

fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
WebRtcController::new::<Self>(signaller)
}
Expand Down Expand Up @@ -251,8 +255,8 @@ impl Backend for GStreamerBackend {
}

fn mute(&self, id: &ClientContextId, val: bool) {
let mut muteables = self.muteables.lock().unwrap();
match muteables.get_mut(id) {
let mut instances = self.instances.lock().unwrap();
match instances.get_mut(id) {
Some(vec) => vec.retain(|(_muteable_id, weak)| {
if let Some(mutex) = weak.upgrade() {
let muteable = mutex.lock().unwrap();
Expand Down
34 changes: 25 additions & 9 deletions backends/gstreamer/player.rs
Expand Up @@ -17,13 +17,13 @@ use servo_media_player::{
PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
};
use servo_media_streams::registry::{get_stream, MediaStreamId};
use servo_media_traits::Muteable;
use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
use source::{register_servo_src, ServoSrc};
use std::cell::RefCell;
use std::error::Error;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex, Once};
use std::time;
use std::u64;
Expand Down Expand Up @@ -325,7 +325,12 @@ impl SeekChannel {
}

pub struct GStreamerPlayer {
/// The player unique ID.
id: usize,
/// The ID of the client context this player belongs to.
context_id: ClientContextId,
/// Channel to communicate with the owner GStreamerBackend instance.
backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
renderer: Option<Arc<Mutex<dyn FrameRenderer>>>,
Expand All @@ -334,13 +339,15 @@ pub struct GStreamerPlayer {
is_ready: Arc<Once>,
/// Indicates whether the type of media stream to be played is a live stream.
stream_type: StreamType,
/// Decorator used to setup the video sink and process the produced frames
/// Decorator used to setup the video sink and process the produced frames.
render: Arc<Mutex<GStreamerRender>>,
}

impl GStreamerPlayer {
pub fn new(
id: usize,
context_id: &ClientContextId,
backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
stream_type: StreamType,
observer: IpcSender<PlayerEvent>,
renderer: Option<Arc<Mutex<dyn FrameRenderer>>>,
Expand All @@ -353,7 +360,9 @@ impl GStreamerPlayer {
);

Self {
id: id,
id,
context_id: *context_id,
backend_chan,
inner: RefCell::new(None),
observer: Arc::new(Mutex::new(observer)),
renderer,
Expand Down Expand Up @@ -766,10 +775,6 @@ impl Player for GStreamerPlayer {
inner_player_proxy!(set_volume, value, f64);
inner_player_proxy!(buffered, Vec<Range<f64>>);

fn shutdown(&self) -> Result<(), PlayerError> {
self.stop()
}

fn render_use_gl(&self) -> bool {
self.render.lock().unwrap().is_gl()
}
Expand All @@ -782,7 +787,7 @@ impl Player for GStreamerPlayer {
}
}

impl Muteable for GStreamerPlayer {
impl MediaInstance for GStreamerPlayer {
fn get_id(&self) -> usize {
self.id
}
Expand All @@ -791,3 +796,14 @@ impl Muteable for GStreamerPlayer {
self.set_mute(val).map_err(|_| ())
}
}

impl Drop for GStreamerPlayer {
fn drop(&mut self) {
let _ = self.stop();
let _ = self
.backend_chan
.lock()
.unwrap()
.send(BackendMsg::Shutdown(self.context_id, self.id));
}
}

0 comments on commit a70f024

Please sign in to comment.