Skip to content

Commit

Permalink
Auto merge of #26628 - gterzian:shutdown_workers, r=asajeffrey
Browse files Browse the repository at this point in the history
Improve worker shutdown

<!-- Please describe your changes on the following line: -->
FIX #26548
FIX #25212

and also  a step towards #26502

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #___ (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
  • Loading branch information
bors-servo committed Jun 3, 2020
2 parents 0bdc448 + f4d258d commit ff3d5c5
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 45 deletions.
11 changes: 9 additions & 2 deletions components/script/dom/abstractworkerglobalscope.rs
Expand Up @@ -82,12 +82,15 @@ impl ScriptPort for Receiver<DedicatedWorkerScriptMsg> {

pub trait WorkerEventLoopMethods {
type WorkerMsg: QueuedTaskConversion + Send;
type ControlMsg;
type Event;
fn task_queue(&self) -> &TaskQueue<Self::WorkerMsg>;
fn handle_event(&self, event: Self::Event);
fn handle_event(&self, event: Self::Event) -> bool;
fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option<AutoWorkerReset>;
fn from_control_msg(&self, msg: Self::ControlMsg) -> Self::Event;
fn from_worker_msg(&self, msg: Self::WorkerMsg) -> Self::Event;
fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> Self::Event;
fn control_receiver(&self) -> &Receiver<Self::ControlMsg>;
}

// https://html.spec.whatwg.org/multipage/#worker-event-loop
Expand All @@ -108,6 +111,7 @@ pub fn run_worker_event_loop<T, WorkerMsg, Event>(
};
let task_queue = worker_scope.task_queue();
let event = select! {
recv(worker_scope.control_receiver()) -> msg => worker_scope.from_control_msg(msg.unwrap()),
recv(task_queue.select()) -> msg => {
task_queue.take_tasks(msg.unwrap());
worker_scope.from_worker_msg(task_queue.recv().unwrap())
Expand Down Expand Up @@ -136,7 +140,10 @@ pub fn run_worker_event_loop<T, WorkerMsg, Event>(
}
// Step 3
for event in sequential {
worker_scope.handle_event(event);
if !worker_scope.handle_event(event) {
// Shutdown
return;
}
// Step 6
let _ar = match worker {
Some(worker) => worker_scope.handle_worker_post_event(worker),
Expand Down
2 changes: 2 additions & 0 deletions components/script/dom/bindings/trace.rs
Expand Up @@ -135,6 +135,7 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::{Instant, SystemTime};
use style::animation::ElementAnimationSet;
use style::attr::{AttrIdentifier, AttrValue, LengthOrPercentageOrAuto};
Expand Down Expand Up @@ -168,6 +169,7 @@ use webxr_api::SwapChainId as WebXRSwapChainId;
use webxr_api::{Finger, Hand, Ray, View};

unsafe_no_jsmanaged_fields!(Tm);
unsafe_no_jsmanaged_fields!(JoinHandle<()>);

/// A trait to allow tracing (only) DOM objects.
pub unsafe trait JSTraceable {
Expand Down
43 changes: 37 additions & 6 deletions components/script/dom/dedicatedworkerglobalscope.rs
Expand Up @@ -56,7 +56,7 @@ use servo_url::ServoUrl;
use std::mem::replace;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;
use std::thread::{self, JoinHandle};
use style::thread_state::{self, ThreadState};

/// Set the `worker` field of a related DedicatedWorkerGlobalScope object to a particular
Expand Down Expand Up @@ -86,6 +86,12 @@ impl<'a> Drop for AutoWorkerReset<'a> {
}
}

/// Messages sent from the owning global.
pub enum DedicatedWorkerControlMsg {
/// Shutdown the worker.
Exit,
}

pub enum DedicatedWorkerScriptMsg {
/// Standard message from a worker.
CommonWorker(TrustedWorkerAddress, WorkerScriptMsg),
Expand All @@ -96,6 +102,7 @@ pub enum DedicatedWorkerScriptMsg {
pub enum MixedMessage {
FromWorker(DedicatedWorkerScriptMsg),
FromDevtools(DevtoolScriptControlMsg),
FromControl(DedicatedWorkerControlMsg),
}

impl QueuedTaskConversion for DedicatedWorkerScriptMsg {
Expand Down Expand Up @@ -183,32 +190,45 @@ pub struct DedicatedWorkerGlobalScope {
#[ignore_malloc_size_of = "Arc"]
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
/// A receiver of control messages,
/// currently only used to signal shutdown.
#[ignore_malloc_size_of = "Channels are hard"]
control_receiver: Receiver<DedicatedWorkerControlMsg>,
}

impl WorkerEventLoopMethods for DedicatedWorkerGlobalScope {
type WorkerMsg = DedicatedWorkerScriptMsg;
type ControlMsg = DedicatedWorkerControlMsg;
type Event = MixedMessage;

fn task_queue(&self) -> &TaskQueue<DedicatedWorkerScriptMsg> {
&self.task_queue
}

fn handle_event(&self, event: MixedMessage) {
self.handle_mixed_message(event);
fn handle_event(&self, event: MixedMessage) -> bool {
self.handle_mixed_message(event)
}

fn handle_worker_post_event(&self, worker: &TrustedWorkerAddress) -> Option<AutoWorkerReset> {
let ar = AutoWorkerReset::new(&self, worker.clone());
Some(ar)
}

fn from_control_msg(&self, msg: DedicatedWorkerControlMsg) -> MixedMessage {
MixedMessage::FromControl(msg)
}

fn from_worker_msg(&self, msg: DedicatedWorkerScriptMsg) -> MixedMessage {
MixedMessage::FromWorker(msg)
}

fn from_devtools_msg(&self, msg: DevtoolScriptControlMsg) -> MixedMessage {
MixedMessage::FromDevtools(msg)
}

fn control_receiver(&self) -> &Receiver<DedicatedWorkerControlMsg> {
&self.control_receiver
}
}

impl DedicatedWorkerGlobalScope {
Expand All @@ -226,6 +246,7 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> DedicatedWorkerGlobalScope {
DedicatedWorkerGlobalScope {
workerglobalscope: WorkerGlobalScope::new_inherited(
Expand All @@ -244,6 +265,7 @@ impl DedicatedWorkerGlobalScope {
worker: DomRefCell::new(None),
image_cache: image_cache,
browsing_context,
control_receiver,
}
}

Expand All @@ -262,6 +284,7 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> DomRoot<DedicatedWorkerGlobalScope> {
let cx = runtime.cx();
let scope = Box::new(DedicatedWorkerGlobalScope::new_inherited(
Expand All @@ -278,6 +301,7 @@ impl DedicatedWorkerGlobalScope {
image_cache,
browsing_context,
gpu_id_hub,
control_receiver,
));
unsafe { DedicatedWorkerGlobalScopeBinding::Wrap(SafeJSContext::from_ptr(cx), scope) }
}
Expand All @@ -299,7 +323,8 @@ impl DedicatedWorkerGlobalScope {
image_cache: Arc<dyn ImageCache>,
browsing_context: Option<BrowsingContextId>,
gpu_id_hub: Arc<Mutex<Identities>>,
) {
control_receiver: Receiver<DedicatedWorkerControlMsg>,
) -> JoinHandle<()> {
let serialized_worker_url = worker_url.to_string();
let name = format!("WebWorker for {}", serialized_worker_url);
let top_level_browsing_context_id = TopLevelBrowsingContextId::installed();
Expand Down Expand Up @@ -370,6 +395,7 @@ impl DedicatedWorkerGlobalScope {
image_cache,
browsing_context,
gpu_id_hub,
control_receiver,
);
// FIXME(njn): workers currently don't have a unique ID suitable for using in reporter
// registration (#6631), so we instead use a random number and cross our fingers.
Expand Down Expand Up @@ -434,8 +460,9 @@ impl DedicatedWorkerGlobalScope {
parent_sender,
CommonScriptMsg::CollectReports,
);
scope.clear_js_runtime();
})
.expect("Thread spawning failed");
.expect("Thread spawning failed")
}

pub fn image_cache(&self) -> Arc<dyn ImageCache> {
Expand Down Expand Up @@ -485,7 +512,7 @@ impl DedicatedWorkerGlobalScope {
}
}

fn handle_mixed_message(&self, msg: MixedMessage) {
fn handle_mixed_message(&self, msg: MixedMessage) -> bool {
// FIXME(#26324): `self.worker` is None in devtools messages.
match msg {
MixedMessage::FromDevtools(msg) => match msg {
Expand All @@ -505,7 +532,11 @@ impl DedicatedWorkerGlobalScope {
self.handle_script_event(msg);
},
MixedMessage::FromWorker(DedicatedWorkerScriptMsg::WakeUp) => {},
MixedMessage::FromControl(DedicatedWorkerControlMsg::Exit) => {
return false;
},
}
true
}

// https://html.spec.whatwg.org/multipage/#runtime-script-errors-2
Expand Down
64 changes: 58 additions & 6 deletions components/script/dom/globalscope.rs
Expand Up @@ -26,7 +26,9 @@ use crate::dom::bindings::weakref::{DOMTracker, WeakRef};
use crate::dom::blob::Blob;
use crate::dom::broadcastchannel::BroadcastChannel;
use crate::dom::crypto::Crypto;
use crate::dom::dedicatedworkerglobalscope::DedicatedWorkerGlobalScope;
use crate::dom::dedicatedworkerglobalscope::{
DedicatedWorkerControlMsg, DedicatedWorkerGlobalScope,
};
use crate::dom::errorevent::ErrorEvent;
use crate::dom::event::{Event, EventBubbles, EventCancelable, EventStatus};
use crate::dom::eventsource::EventSource;
Expand Down Expand Up @@ -65,6 +67,7 @@ use crate::task_source::TaskSourceName;
use crate::timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle};
use crate::timers::{OneshotTimers, TimerCallback};
use content_security_policy::CspList;
use crossbeam_channel::Sender;
use devtools_traits::{PageError, ScriptToDevtoolsControlMsg};
use dom_struct::dom_struct;
use embedder_traits::EmbedderMsg;
Expand Down Expand Up @@ -112,15 +115,46 @@ use std::ops::Index;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use time::{get_time, Timespec};
use uuid::Uuid;

#[derive(JSTraceable)]
pub struct AutoCloseWorker(Arc<AtomicBool>);
pub struct AutoCloseWorker {
/// https://html.spec.whatwg.org/multipage/#dom-workerglobalscope-closing
closing: Arc<AtomicBool>,
/// A handle to join on the worker thread.
join_handle: Option<JoinHandle<()>>,
/// A sender of control messages,
/// currently only used to signal shutdown.
control_sender: Sender<DedicatedWorkerControlMsg>,
}

impl Drop for AutoCloseWorker {
/// <https://html.spec.whatwg.org/multipage/#terminate-a-worker>
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
// Step 1.
self.closing.store(true, Ordering::SeqCst);

if self
.control_sender
.send(DedicatedWorkerControlMsg::Exit)
.is_err()
{
warn!("Couldn't send an exit message to a dedicated worker.");
}

// TODO: step 2 and 3.
// Step 4 is unnecessary since we don't use actual ports for dedicated workers.
if self
.join_handle
.take()
.expect("No handle to join on worker.")
.join()
.is_err()
{
warn!("Failed to join on dedicated worker thread.");
}
}
}

Expand Down Expand Up @@ -760,9 +794,18 @@ impl GlobalScope {
}

/// Remove the routers for ports and broadcast-channels.
pub fn remove_web_messaging_infra(&self) {
/// Drain the list of workers.
pub fn remove_web_messaging_and_dedicated_workers_infra(&self) {
self.remove_message_ports_router();
self.remove_broadcast_channel_router();

// Drop each ref to a worker explicitly now,
// which will send a shutdown signal,
// and join on the worker thread.
self.list_auto_close_worker
.borrow_mut()
.drain(0..)
.for_each(|worker| drop(worker));
}

/// Update our state to un-managed,
Expand Down Expand Up @@ -1794,10 +1837,19 @@ impl GlobalScope {
&self.permission_state_invocation_results
}

pub fn track_worker(&self, closing_worker: Arc<AtomicBool>) {
pub fn track_worker(
&self,
closing: Arc<AtomicBool>,
join_handle: JoinHandle<()>,
control_sender: Sender<DedicatedWorkerControlMsg>,
) {
self.list_auto_close_worker
.borrow_mut()
.push(AutoCloseWorker(closing_worker));
.push(AutoCloseWorker {
closing,
join_handle: Some(join_handle),
control_sender: control_sender,
});
}

pub fn track_event_source(&self, event_source: &EventSource) {
Expand Down

0 comments on commit ff3d5c5

Please sign in to comment.