Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

script: Make the resource task communication use IPC channels. #6586

Merged
merged 5 commits into from Jul 31, 2015

net: Use a thread for each `AsyncResponseTarget` to avoid having to send

trait objects across process boundaries.
  • Loading branch information
pcwalton committed Jul 31, 2015
commit 44d13f7fd419bdff1420ed21ca3efd72f4015bfa
@@ -14,6 +14,7 @@ use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::mem;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver, Select};
use std::thread;
use util::resource_files::resources_dir_path;
use util::task::spawn_named;
use util::taskpool::TaskPool;
@@ -100,14 +101,17 @@ struct ResourceLoadInfo {
struct ResourceListener {
url: Url,
sender: Sender<ResourceLoadInfo>,
receiver: Receiver<ResponseAction>,
}

impl AsyncResponseTarget for ResourceListener {
fn invoke_with_listener(&self, action: ResponseAction) {
self.sender.send(ResourceLoadInfo {
action: action,
url: self.url.clone(),
}).unwrap();
impl ResourceListener {
fn run(&self) {
while let Ok(action) = self.receiver.recv() {
self.sender.send(ResourceLoadInfo {
action: action,
url: self.url.clone(),
}).unwrap();
}
}
}

@@ -330,11 +334,17 @@ impl ImageCache {
e.insert(pending_load);

let load_data = LoadData::new(url.clone(), None);
let (action_sender, action_receiver) = channel();
let listener = box ResourceListener {
url: url,
sender: self.progress_sender.clone(),
receiver: action_receiver,
};
let msg = ControlMsg::Load(load_data, LoadConsumer::Listener(listener));
let msg = ControlMsg::Load(load_data,
LoadConsumer::Listener(AsyncResponseTarget {
sender: action_sender,
}));
thread::spawn(move || listener.run());
self.resource_task.send(msg).unwrap();
}
}
@@ -69,7 +69,7 @@ pub fn global_init() {

pub enum ProgressSender {
Channel(Sender<ProgressMsg>),
Listener(Box<AsyncResponseTarget>),
Listener(AsyncResponseTarget),
}

impl ProgressSender {
@@ -114,14 +114,20 @@ impl ResponseAction {

/// A target for async networking events. Commonly used to dispatch a runnable event to another
/// thread storing the wrapped closure for later execution.
pub trait AsyncResponseTarget {
fn invoke_with_listener(&self, action: ResponseAction);
pub struct AsyncResponseTarget {
pub sender: Sender<ResponseAction>,
}

impl AsyncResponseTarget {
pub fn invoke_with_listener(&self, action: ResponseAction) {
self.sender.send(action).unwrap()
}
}

/// A wrapper for a network load that can either be channel or event-based.
pub enum LoadConsumer {
Channel(Sender<LoadResponse>),
Listener(Box<AsyncResponseTarget + Send>),
Listener(AsyncResponseTarget),
}

/// Handle to a resource task
@@ -195,7 +201,7 @@ impl PendingAsyncLoad {
}

/// Initiate the network request associated with this pending load, using the provided target.
pub fn load_async(mut self, listener: Box<AsyncResponseTarget + Send>) {
pub fn load_async(mut self, listener: AsyncResponseTarget) {
self.guard.neuter();
let load_data = LoadData::new(self.url, self.pipeline);
let consumer = LoadConsumer::Listener(listener);
@@ -17,6 +17,7 @@ use net_traits::{SerializableStringResult};
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
use std::cell::RefCell;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use time;
use time::{now, Timespec};
@@ -132,9 +133,14 @@ impl CORSRequest {
listener: listener,
response: RefCell::new(None),
};
let (action_sender, action_receiver) = mpsc::channel();
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
script_chan: script_chan,
receiver: action_receiver,
};
let response_target = AsyncResponseTarget {
sender: action_sender,
};

// TODO: this exists only to make preflight check non-blocking
@@ -145,7 +151,7 @@ impl CORSRequest {
let mut context = listener.context.lock();
let context = context.as_mut().unwrap();
*context.response.borrow_mut() = Some(response);
listener.invoke_with_listener(ResponseAction::ResponseComplete(
response_target.invoke_with_listener(ResponseAction::ResponseComplete(
SerializableStringResult(Ok(()))));
});
}
@@ -73,7 +73,7 @@ impl DocumentLoader {
}

/// Create and initiate a new network request.
pub fn load_async(&mut self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
pub fn load_async(&mut self, load: LoadType, listener: AsyncResponseTarget) {
let pending = self.prepare_async_load(load);
pending.load_async(listener)
}
@@ -283,7 +283,7 @@ pub trait DocumentHelpers<'a> {
/// https://w3c.github.io/animation-timing/#dfn-invoke-callbacks-algorithm
fn invoke_animation_callbacks(self);
fn prepare_async_load(self, load: LoadType) -> PendingAsyncLoad;
fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>);
fn load_async(self, load: LoadType, listener: AsyncResponseTarget);
fn load_sync(self, load: LoadType) -> Result<(Metadata, Vec<u8>), String>;
fn finish_load(self, load: LoadType);
fn set_current_parser(self, script: Option<&ServoHTMLParser>);
@@ -968,7 +968,7 @@ impl<'a> DocumentHelpers<'a> for &'a Document {
loader.prepare_async_load(load)
}

fn load_async(self, load: LoadType, listener: Box<AsyncResponseTarget + Send>) {
fn load_async(self, load: LoadType, listener: AsyncResponseTarget) {
let mut loader = self.loader.borrow_mut();
loader.load_async(load, listener)
}
@@ -40,12 +40,14 @@ use js::jsval::UndefinedValue;
use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label;
use encoding::types::{Encoding, EncodingRef, DecoderTrap};
use net_traits::{Metadata, AsyncResponseListener};
use net_traits::{Metadata, AsyncResponseListener, AsyncResponseTarget};
use util::str::{DOMString, HTML_SPACE_CHARACTERS, StaticStringVec};
use html5ever::tree_builder::NextParserState;
use std::cell::{RefCell, Cell};
use std::mem;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use string_cache::Atom;
use url::{Url, UrlParser};

@@ -330,12 +332,18 @@ impl<'a> HTMLScriptElementHelpers for &'a HTMLScriptElement {
url: url.clone(),
}));

let (action_sender, action_receiver) = mpsc::channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
receiver: action_receiver,
};
let response_target = AsyncResponseTarget {
sender: action_sender,
};
thread::spawn(move || listener.run());

doc.r().load_async(LoadType::Script(url), listener);
doc.r().load_async(LoadType::Script(url), response_target);

if self.parser_inserted.get() {
doc.r().get_current_parser().unwrap().r().suspend();
@@ -46,8 +46,8 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};

use net_traits::ControlMsg::Load;
use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer};
use net_traits::{AsyncResponseListener, Metadata, SerializableHeaders, SerializableMethod};
use net_traits::{SerializableUrl};
use net_traits::{AsyncResponseListener, AsyncResponseTarget, Metadata, SerializableHeaders};
use net_traits::{SerializableMethod, SerializableUrl};
use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener};
use cors::CORSResponse;
use util::str::DOMString;
@@ -59,7 +59,7 @@ use std::cell::{RefCell, Cell};
use std::default::Default;
use std::sync::{Mutex, Arc};
use std::sync::mpsc::{channel, Sender, TryRecvError};
use std::thread::sleep_ms;
use std::thread::{self, sleep_ms};
use time;
use url::{Url, UrlParser};

@@ -271,11 +271,17 @@ impl XMLHttpRequest {
}
}

let (action_sender, action_receiver) = channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan,
receiver: action_receiver,
};
resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap();
let response_target = AsyncResponseTarget {
sender: action_sender,
};
thread::spawn(move || listener.run());
resource_task.send(Load(load_data, LoadConsumer::Listener(response_target))).unwrap();
}
}

@@ -3,22 +3,26 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use script_task::{ScriptChan, ScriptMsg, Runnable};
use net_traits::{AsyncResponseTarget, AsyncResponseListener, ResponseAction};
use net_traits::{AsyncResponseListener, ResponseAction};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};

/// An off-thread sink for async network event runnables. All such events are forwarded to
/// a target thread, where they are invoked on the provided context object.
pub struct NetworkListener<T: AsyncResponseListener + PreInvoke + Send + 'static> {
pub context: Arc<Mutex<T>>,
pub script_chan: Box<ScriptChan+Send>,
pub receiver: Receiver<ResponseAction>,
}

impl<T: AsyncResponseListener + PreInvoke + Send + 'static> AsyncResponseTarget for NetworkListener<T> {
fn invoke_with_listener(&self, action: ResponseAction) {
self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
context: self.context.clone(),
action: action,
})).unwrap();
impl<T: AsyncResponseListener + PreInvoke + Send + 'static> NetworkListener<T> {
pub fn run(&self) {
while let Ok(action) = self.receiver.recv() {
self.script_chan.send(ScriptMsg::RunnableMsg(box ListenerRunnable {
context: self.context.clone(),
action: action,
})).unwrap();
}
}
}

@@ -68,7 +68,7 @@ use msg::constellation_msg::{Failure, WindowSizeData, PipelineExitType};
use msg::constellation_msg::Msg as ConstellationMsg;
use msg::webdriver_msg::WebDriverScriptCommand;
use net_traits::LoadData as NetLoadData;
use net_traits::{ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::{AsyncResponseTarget, ResourceTask, LoadConsumer, ControlMsg, Metadata};
use net_traits::{SerializableContentType, SerializableHeaders, SerializableMethod};
use net_traits::{SerializableUrl};
use net_traits::image_cache_task::{ImageCacheChan, ImageCacheTask, ImageCacheResult};
@@ -105,6 +105,7 @@ use std::rc::Rc;
use std::result::Result;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver, Select};
use std::thread;
use time::Tm;

use hyper::header::{ContentType, HttpDate};
@@ -1686,9 +1687,15 @@ impl ScriptTask {

let context = Arc::new(Mutex::new(ParserContext::new(id, subpage, script_chan.clone(),
load_data.url.clone())));
let (action_sender, action_receiver) = channel();
let listener = box NetworkListener {
context: context,
script_chan: script_chan.clone(),
receiver: action_receiver,
};
thread::spawn(move || listener.run());
let response_target = AsyncResponseTarget {
sender: action_sender,
};

if load_data.url.scheme == "javascript" {
@@ -1703,7 +1710,7 @@ impl ScriptTask {
data: load_data.data,
cors: None,
pipeline_id: Some(id),
}, LoadConsumer::Listener(listener))).unwrap();
}, LoadConsumer::Listener(response_target))).unwrap();

self.incomplete_loads.borrow_mut().push(incomplete);
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.