Skip to content

Commit

Permalink
add a control chan to workers, use to signal shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jun 3, 2020
1 parent ed688fe commit 947fa8b
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 31 deletions.
11 changes: 9 additions & 2 deletions components/script/dom/abstractworkerglobalscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> {

pub trait WorkerEventLoopMethods {
type WorkerMsg: QueuedTaskConversion + Send;
type ControlMsg;
type Event;
fn task_queue(&self) -> &TaskQueue<Self::WorkerMsg>;
fn handle_event(&self, event: Self::Event);
fn handle_event(&self, event: Self::Event) -> bool;
fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option<AutoWorkerReset>;
fn from_control_msg(&self, msg: Self::ControlMsg) -> Self::Event;
fn from_worker_msg(&self, msg: Self::WorkerMsg) -> Self::Event;
fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> Self::Event;
fn control_receiver(&self) -> &Receiver<Self::ControlMsg>;
}

// https://html.spec.whatwg.org/multipage/#worker-event-loop
Expand All @@ -108,6 +111,7 @@ pub fn run_worker_event_loop<T, WorkerMsg, Event>(
};
let task_queue = worker_scope.task_queue();
let event = select! {
recv(worker_scope.control_receiver()) -> msg => worker_scope.from_control_msg(msg.unwrap()),
recv(task_queue.select()) -> msg => {
task_queue.take_tasks(msg.unwrap());
worker_scope.from_worker_msg(task_queue.recv().unwrap())
Expand Down Expand Up @@ -136,7 +140,10 @@ pub fn run_worker_event_loop<T, WorkerMsg, Event>(
}
// Step 3
for event in sequential {
worker_scope.handle_event(event);
if !worker_scope.handle_event(event) {
// Shutdown
return;
}
// Step 6
let _ar = match worker {
Some(worker) => worker_scope.handle_worker_post_event(worker),
Expand Down
36 changes: 33 additions & 3 deletions components/script/dom/dedicatedworkerglobalscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ impl<'a> Drop for AutoWorkerReset<'a> {
}
}

/// Messages sent from the owning global.
pub enum DedicatedWorkerControlMsg {
/// Shutdown the worker.
Exit,
}

pub enum DedicatedWorkerScriptMsg {
/// Standard message from a worker.
CommonWorker(TrustedWorkerAddress, WorkerScriptMsg),
Expand All @@ -96,6 +102,7 @@ pub enum DedicatedWorkerScriptMsg {
pub enum MixedMessage {
FromWorker(DedicatedWorkerScriptMsg),
FromDevtools(DevtoolScriptControlMsg),
FromControl(DedicatedWorkerControlMsg),
}

impl QueuedTaskConversion for DedicatedWorkerScriptMsg {
Expand Down Expand Up @@ -183,32 +190,45 @@ pub struct DedicatedWorkerGlobalScope {
#[ignore_malloc_size_of = "Arc"]
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
/// A receiver of control messages,
/// currently only used to signal shutdown.
#[ignore_malloc_size_of = "Channels are hard"]
control_receiver: Receiver<DedicatedWorkerControlMsg>,
}

impl WorkerEventLoopMethods for DedicatedWorkerGlobalScope {
type WorkerMsg = DedicatedWorkerScriptMsg;
type ControlMsg = DedicatedWorkerControlMsg;
type Event = MixedMessage;

fn task_queue(&self) -> &TaskQueue<DedicatedWorkerScriptMsg> {
&self.task_queue
}

fn handle_event(&self, event: MixedMessage) {
self.handle_mixed_message(event);
fn handle_event(&self, event: MixedMessage) -> bool {
self.handle_mixed_message(event)
}

fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option<AutoWorkerReset> {
let ar = AutoWorkerReset::new(&self, worker.clone());
Some(ar)
}

fn from_control_msg(&self, msg: DedicatedWorkerControlMsg) -> MixedMessage {
MixedMessage::FromControl(msg)
}

fn from_worker_msg(&self, msg: DedicatedWorkerScriptMsg) -> MixedMessage {
MixedMessage::FromWorker(msg)
}

fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> MixedMessage {
MixedMessage::FromDevtools(msg)
}

fn control_receiver(&self) -> &Receiver<DedicatedWorkerControlMsg> {
&self.control_receiver
}
}

impl DedicatedWorkerGlobalScope {
Expand All @@ -226,6 +246,7 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> DedicatedWorkerGlobalScope {
DedicatedWorkerGlobalScope {
workerglobalscope: WorkerGlobalScope::new_inherited(
Expand All @@ -244,6 +265,7 @@ impl DedicatedWorkerGlobalScope {
worker: DomRefCell::new(None),
image_cache: image_cache,
browsing_context,
control_receiver,
}
}

Expand All @@ -262,6 +284,7 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> DomRoot<DedicatedWorkerGlobalScope> {
let cx = runtime.cx();
let scope = Box::new(DedicatedWorkerGlobalScope::new_inherited(
Expand All @@ -278,6 +301,7 @@ impl DedicatedWorkerGlobalScope {
image_cache,
browsing_context,
gpu_id_hub,
control_receiver,
));
unsafe { DedicatedWorkerGlobalScopeBinding::Wrap(SafeJSContext::from_ptr(cx), scope) }
}
Expand All @@ -299,6 +323,7 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> JoinHandle<()> {
let serialized_worker_url = worker_url.to_string();
let name = format!("WebWorker for {}", serialized_worker_url);
Expand Down Expand Up @@ -370,6 +395,7 @@ impl DedicatedWorkerGlobalScope {
image_cache,
browsing_context,
gpu_id_hub,
control_receiver,
);
// FIXME(njn): workers currently don't have a unique ID suitable for using in reporter
// registration (#6631), so we instead use a random number and cross our fingers.
Expand Down Expand Up @@ -485,7 +511,7 @@ impl DedicatedWorkerGlobalScope {
}
}

fn handle_mixed_message(&self, msg: MixedMessage) {
fn handle_mixed_message(&self, msg: MixedMessage) -> bool {
// FIXME(#26324): `self.worker` is None in devtools messages.
match msg {
MixedMessage::FromDevtools(msg) => match msg {
Expand All @@ -505,7 +531,11 @@ impl DedicatedWorkerGlobalScope {
self.handle_script_event(msg);
},
MixedMessage::FromWorker(DedicatedWorkerScriptMsg::WakeUp) => {},
MixedMessage::FromControl(DedicatedWorkerControlMsg::Exit) => {
return false;
},
}
true
}

// https://html.spec.whatwg.org/multipage/#runtime-script-errors-2
Expand Down
34 changes: 30 additions & 4 deletions components/script/dom/globalscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::dom::bindings::weakref::{DOMTracker, WeakRef};
use crate::dom::blob::Blob;
use crate::dom::broadcastchannel::BroadcastChannel;
use crate::dom::crypto::Crypto;
use crate::dom::dedicatedworkerglobalscope::DedicatedWorkerGlobalScope;
use crate::dom::dedicatedworkerglobalscope::{
DedicatedWorkerControlMsg, DedicatedWorkerGlobalScope,
};
use crate::dom::errorevent::ErrorEvent;
use crate::dom::event::{Event, EventBubbles, EventCancelable, EventStatus};
use crate::dom::eventsource::EventSource;
Expand Down Expand Up @@ -65,6 +67,7 @@ use crate::task_source::TaskSourceName;
use crate::timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle};
use crate::timers::{OneshotTimers, TimerCallback};
use content_security_policy::CspList;
use crossbeam_channel::Sender;
use devtools_traits::{PageError, ScriptToDevtoolsControlMsg};
use dom_struct::dom_struct;
use embedder_traits::EmbedderMsg;
Expand Down Expand Up @@ -118,8 +121,13 @@ use uuid::Uuid;

#[derive(JSTraceable)]
pub struct AutoCloseWorker {
/// https://html.spec.whatwg.org/multipage/#dom-workerglobalscope-closing
closing: Arc<AtomicBool>,
/// A handle to join on the worker thread.
join_handle: Option<JoinHandle<()>>,
/// A sender of control messages,
/// currently only used to signal shutdown.
control_sender: Sender<DedicatedWorkerControlMsg>,
}

impl Drop for AutoCloseWorker {
Expand All @@ -128,13 +136,25 @@ impl Drop for AutoCloseWorker {
// Step 1.
self.closing.store(true, Ordering::SeqCst);

if self
.control_sender
.send(DedicatedWorkerControlMsg::Exit)
.is_err()
{
warn!("Couldn't send an exit message to a dedicated worker.");
}

// TODO: step 2 and 3.
// Step 4 is unnecessary since we don't use actual ports for dedicated workers.
self.join_handle
if self
.join_handle
.take()
.expect("No handle to join on worker.")
.join()
.expect("Couldn't join on worker thread.");
.is_err()
{
warn!("Failed to join on dedicated worker thread.");
}
}
}

Expand Down Expand Up @@ -1808,12 +1828,18 @@ impl GlobalScope {
&self.permission_state_invocation_results
}

pub fn track_worker(&self, closing: Arc<AtomicBool>, join_handle: JoinHandle<()>) {
pub fn track_worker(
&self,
closing: Arc<AtomicBool>,
join_handle: JoinHandle<()>,
control_sender: Sender<DedicatedWorkerControlMsg>,
) {
self.list_auto_close_worker
.borrow_mut()
.push(AutoCloseWorker {
closing,
join_handle: Some(join_handle),
control_sender: control_sender,
});
}

Expand Down

0 comments on commit 947fa8b

Please sign in to comment.