Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In BC event-loop, only run tasks related to fully-active documents #22802

Merged
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

in BC event-loop, only run tasks related to fully-active documents

  • Loading branch information
gterzian committed Mar 6, 2019
commit ecfb9c639ad610d406e7dd278dfc7aac964ddca8
@@ -35,7 +35,7 @@ use js::jsapi::JS_AddInterruptCallback;
use js::jsapi::{JSAutoCompartment, JSContext};
use js::jsval::UndefinedValue;
use js::rust::HandleValue;
use msg::constellation_msg::TopLevelBrowsingContextId;
use msg::constellation_msg::{PipelineId, TopLevelBrowsingContextId};
use net_traits::request::{CredentialsMode, Destination, RequestInit};
use net_traits::{load_whole_resource, IpcSend};
use script_traits::{TimerEvent, TimerSource, WorkerGlobalScopeInit, WorkerScriptLoadOrigin};
@@ -101,10 +101,16 @@ impl QueuedTaskConversion for DedicatedWorkerScriptMsg {
CommonScriptMsg::Task(_category, _boxed, _pipeline_id, source_name) => {
Some(&source_name)
},
_ => return None,
_ => None,
}
}

fn pipeline_id(&self) -> Option<PipelineId> {
// Workers always return None, since the pipeline_id is only used to check for document activity,
// and this check does not apply to worker event-loops.
None
}

fn into_queued_task(self) -> Option<QueuedTask> {
let (worker, common_worker_msg) = match self {
DedicatedWorkerScriptMsg::CommonWorker(worker, common_worker_msg) => {
@@ -131,6 +137,11 @@ impl QueuedTaskConversion for DedicatedWorkerScriptMsg {
DedicatedWorkerScriptMsg::CommonWorker(worker.unwrap(), WorkerScriptMsg::Common(script_msg))
}

fn inactive_msg() -> Self {
// Inactive is only relevant in the context of a browsing-context event-loop.
panic!("Workers should never receive messages marked as inactive");
}

fn wake_up_msg() -> Self {
DedicatedWorkerScriptMsg::WakeUp
}
@@ -29,6 +29,7 @@ use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::{JSAutoCompartment, JSContext, JS_AddInterruptCallback};
use js::jsval::UndefinedValue;
use msg::constellation_msg::PipelineId;
use net_traits::request::{CredentialsMode, Destination, RequestInit};
use net_traits::{load_whole_resource, CustomResponseMediator, IpcSend};
use script_traits::{
@@ -61,10 +62,16 @@ impl QueuedTaskConversion for ServiceWorkerScriptMsg {
CommonScriptMsg::Task(_category, _boxed, _pipeline_id, task_source) => {
Some(&task_source)
},
_ => return None,
_ => None,
}
}

fn pipeline_id(&self) -> Option<PipelineId> {
// Workers always return None, since the pipeline_id is only used to check for document activity,
// and this check does not apply to worker event-loops.
None
}

fn into_queued_task(self) -> Option<QueuedTask> {
let script_msg = match self {
ServiceWorkerScriptMsg::CommonWorker(WorkerScriptMsg::Common(script_msg)) => script_msg,
@@ -85,6 +92,11 @@ impl QueuedTaskConversion for ServiceWorkerScriptMsg {
ServiceWorkerScriptMsg::CommonWorker(WorkerScriptMsg::Common(script_msg))
}

fn inactive_msg() -> Self {
// Inactive is only relevant in the context of a browsing-context event-loop.
panic!("Workers should never receive messages marked as inactive");
}

fn wake_up_msg() -> Self {
ServiceWorkerScriptMsg::WakeUp
}
@@ -271,6 +271,8 @@ pub enum MainThreadScriptMsg {
},
/// Dispatches a job queue.
DispatchJobQueue { scope_url: ServoUrl },
/// A task related to a not fully-active document has been throttled.
Inactive,
/// Wake-up call from the task queue.
WakeUp,
}
@@ -285,7 +287,20 @@ impl QueuedTaskConversion for MainThreadScriptMsg {
CommonScriptMsg::Task(_category, _boxed, _pipeline_id, task_source) => {
Some(&task_source)
},
_ => None,
}
}

fn pipeline_id(&self) -> Option<PipelineId> {
let script_msg = match self {
MainThreadScriptMsg::Common(script_msg) => script_msg,
_ => return None,
};
match script_msg {
CommonScriptMsg::Task(_category, _boxed, pipeline_id, _task_source) => {
pipeline_id.clone()
},
_ => None,
}
}

@@ -309,6 +324,10 @@ impl QueuedTaskConversion for MainThreadScriptMsg {
MainThreadScriptMsg::Common(script_msg)
}

fn inactive_msg() -> Self {
MainThreadScriptMsg::Inactive
}

fn wake_up_msg() -> Self {
MainThreadScriptMsg::WakeUp
}
@@ -881,6 +900,29 @@ impl ScriptThread {
})
}

pub fn get_fully_active_document_ids() -> HashSet<PipelineId> {
SCRIPT_THREAD_ROOT.with(|root| {
root.get().map_or(HashSet::new(), |script_thread| {
let script_thread = unsafe { &*script_thread };
script_thread
.documents
.borrow()
.iter()
.filter_map(|(id, document)| {
if document.is_fully_active() {
Some(id.clone())
} else {
None
}
})
.fold(HashSet::new(), |mut set, id| {
let _ = set.insert(id);
set
})
})
})
}

pub fn find_window_proxy(id: BrowsingContextId) -> Option<DomRoot<WindowProxy>> {
SCRIPT_THREAD_ROOT.with(|root| {
root.get().and_then(|script_thread| {
@@ -1169,7 +1211,11 @@ impl ScriptThread {
let mut event = select! {
recv(self.task_queue.select()) -> msg => {
self.task_queue.take_tasks(msg.unwrap());
FromScript(self.task_queue.recv().unwrap())
let event = self
.task_queue
.recv()
.expect("Spurious wake-up of the event-loop, task-queue has no tasks available");
FromScript(event)
},
recv(self.control_port) -> msg => FromConstellation(msg.unwrap()),
recv(self.timer_event_port) -> msg => FromScheduler(msg.unwrap()),
@@ -1252,6 +1298,10 @@ impl ScriptThread {
Some(index) => sequential[index] = event,
}
},
FromScript(MainThreadScriptMsg::Inactive) => {
// An event came-in from a document that is not fully-active, it has been stored by the task-queue.
// Continue without adding it to "sequential".
},
_ => {
sequential.push(event);
},
@@ -1460,6 +1510,7 @@ impl ScriptThread {
MainThreadScriptMsg::WorkletLoaded(pipeline_id) => Some(pipeline_id),
MainThreadScriptMsg::RegisterPaintWorklet { pipeline_id, .. } => Some(pipeline_id),
MainThreadScriptMsg::DispatchJobQueue { .. } => None,
MainThreadScriptMsg::Inactive => None,
MainThreadScriptMsg::WakeUp => None,
},
MixedMessage::FromImageCache((pipeline_id, _)) => Some(pipeline_id),
@@ -1703,6 +1754,7 @@ impl ScriptThread {
MainThreadScriptMsg::DispatchJobQueue { scope_url } => {
self.job_queue_map.run_job(scope_url, self)
},
MainThreadScriptMsg::Inactive => {},
MainThreadScriptMsg::WakeUp => {},
}
}
@@ -7,12 +7,13 @@
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::worker::TrustedWorkerAddress;
use crate::script_runtime::ScriptThreadEventCategory;
use crate::script_thread::ScriptThread;
use crate::task::TaskBox;
use crate::task_source::TaskSourceName;
use crossbeam_channel::{self, Receiver, Sender};
use msg::constellation_msg::PipelineId;
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::default::Default;

pub type QueuedTask = (
@@ -26,8 +27,10 @@ pub type QueuedTask = (
/// Defining the operations used to convert from a msg T to a QueuedTask.
pub trait QueuedTaskConversion {
fn task_source_name(&self) -> Option<&TaskSourceName>;
fn pipeline_id(&self) -> Option<PipelineId>;
fn into_queued_task(self) -> Option<QueuedTask>;
fn from_queued_task(queued_task: QueuedTask) -> Self;
fn inactive_msg() -> Self;
fn wake_up_msg() -> Self;
fn is_wake_up(&self) -> bool;
}
@@ -43,6 +46,8 @@ pub struct TaskQueue<T> {
taken_task_counter: Cell<u64>,
/// Tasks that will be throttled for as long as we are "busy".
throttled: DomRefCell<HashMap<TaskSourceName, VecDeque<QueuedTask>>>,
/// Tasks for not fully-active documents.
inactive: DomRefCell<HashMap<PipelineId, VecDeque<QueuedTask>>>,
}

impl<T: QueuedTaskConversion> TaskQueue<T> {
@@ -53,22 +58,66 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
msg_queue: DomRefCell::new(VecDeque::new()),
taken_task_counter: Default::default(),
throttled: Default::default(),
inactive: Default::default(),
}
}

/// Release previously held-back tasks for documents that are now fully-active.
/// https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active
fn release_tasks_for_fully_active_documents(
&self,
fully_active: &HashSet<PipelineId>,
) -> Vec<T> {
self.inactive
.borrow_mut()
.iter_mut()
.filter(|(pipeline_id, _)| fully_active.contains(pipeline_id))
.flat_map(|(_, inactive_queue)| {
inactive_queue
.drain(0..)
.map(|queued_task| T::from_queued_task(queued_task))
})
.collect()
}

/// Hold back tasks for currently not fully-active documents.
/// https://html.spec.whatwg.org/multipage/#event-loop-processing-model:fully-active
fn store_task_for_inactive_pipeline(&self, msg: T, pipeline_id: &PipelineId) {
let mut inactive = self.inactive.borrow_mut();
let inactive_queue = inactive.entry(pipeline_id.clone()).or_default();
inactive_queue.push_back(
msg.into_queued_task()
.expect("Incoming messages should always be convertible into queued tasks"),
);
let mut msg_queue = self.msg_queue.borrow_mut();
if msg_queue.is_empty() {
// Ensure there is at least one message.
// Otherwise if the just stored inactive message
// was the first and last of this iteration,
// it will result in a spurious wake-up of the event-loop.
msg_queue.push_back(T::inactive_msg());
}
}

/// Process incoming tasks, immediately sending priority ones downstream,
/// and categorizing potential throttles.
fn process_incoming_tasks(&self, first_msg: T) {
let mut incoming = Vec::with_capacity(self.port.len() + 1);
fn process_incoming_tasks(&self, first_msg: T, fully_active: &HashSet<PipelineId>) {
// 1. Make any previously stored task from now fully-active document available.
let mut incoming = self.release_tasks_for_fully_active_documents(fully_active);

// 2. Process the first message(artifact of the fact that select always returns a message).
if !first_msg.is_wake_up() {
incoming.push(first_msg);
}

// 3. Process any other incoming message.
while let Ok(msg) = self.port.try_recv() {
if !msg.is_wake_up() {
incoming.push(msg);
}
}

// 4. Filter tasks from non-priority task-sources.
let to_be_throttled: Vec<T> = incoming
.drain_filter(|msg| {
let task_source = match msg.task_source_name() {
@@ -88,6 +137,12 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
.collect();

for msg in incoming {
if let Some(pipeline_id) = msg.pipeline_id() {
if !fully_active.contains(&pipeline_id) {
self.store_task_for_inactive_pipeline(msg, &pipeline_id);
continue;
}
}
// Immediately send non-throttled tasks for processing.
let _ = self.msg_queue.borrow_mut().push_back(msg);
}
@@ -103,7 +158,7 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
let mut throttled_tasks = self.throttled.borrow_mut();
throttled_tasks
.entry(task_source.clone())
.or_insert(VecDeque::new())
.or_default()
.push_back((worker, category, boxed, pipeline_id, task_source));
}
}
@@ -133,8 +188,9 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
pub fn take_tasks(&self, first_msg: T) {
// High-watermark: once reached, throttled tasks will be held-back.
const PER_ITERATION_MAX: u64 = 5;
let fully_active = ScriptThread::get_fully_active_document_ids();
// Always first check for new tasks, but don't reset 'taken_task_counter'.
self.process_incoming_tasks(first_msg);
self.process_incoming_tasks(first_msg, &fully_active);
let mut throttled = self.throttled.borrow_mut();
let mut throttled_length: usize = throttled.values().map(|queue| queue.len()).sum();
let task_source_names = TaskSourceName::all();
@@ -165,6 +221,20 @@ impl<T: QueuedTaskConversion> TaskQueue<T> {
None => continue,
};
let msg = T::from_queued_task(queued_task);

// Hold back tasks for currently inactive documents.
if let Some(pipeline_id) = msg.pipeline_id() {
if !fully_active.contains(&pipeline_id) {
self.store_task_for_inactive_pipeline(msg, &pipeline_id);
// Reduce the length of throttles,
// but don't add the task to "msg_queue",
// and neither increment "taken_task_counter".
throttled_length = throttled_length - 1;
continue;
}
}

// Make the task available for the event-loop to handle as a message.
let _ = self.msg_queue.borrow_mut().push_back(msg);
self.taken_task_counter
.set(self.taken_task_counter.get() + 1);
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.