Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async fetching of extenal script sources via interruptible parsing. #5197

Closed
wants to merge 14 commits into from
Next

Start switching net/ to use abstractions over channels to allow intro…

…ducing non-channel communication in the future.
  • Loading branch information
jdm committed Mar 5, 2015
commit 37836f3c7802487164aefe28767a0b62e3fbb84d
@@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use resource_task::{TargetedLoadResponse, Metadata, LoadData, start_sending, ResponseSenders};
use resource_task::{Metadata, LoadData, start_sending, ResponseSenders};
use resource_task::ProgressMsg::Done;
use file_loader;

@@ -12,13 +12,8 @@ use util::resource_files::resources_dir_path;

use std::borrow::IntoCow;
use std::old_io::fs::PathExtensions;
use std::sync::mpsc::Sender;

pub fn factory(mut load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
let senders = ResponseSenders {
immediate_consumer: start_chan.clone(),
eventual_consumer: load_data.consumer.clone(),
};
pub fn factory(mut load_data: LoadData, senders: ResponseSenders) {
match load_data.url.non_relative_scheme_data().unwrap() {
"blank" => {
let chan = start_sending(senders, Metadata {
@@ -44,5 +39,5 @@ pub fn factory(mut load_data: LoadData, start_chan: Sender<TargetedLoadResponse>
return
}
};
file_loader::factory(load_data, start_chan)
file_loader::factory(load_data, senders)
}
@@ -2,35 +2,28 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use resource_task::{Metadata, LoadData, TargetedLoadResponse, start_sending, ResponseSenders};
use resource_task::{Metadata, LoadData, start_sending, ResponseSenders};
use resource_task::ProgressMsg::{Payload, Done};

use rustc_serialize::base64::FromBase64;

use hyper::mime::Mime;
use url::{percent_decode, SchemeData};

use std::sync::mpsc::Sender;

pub fn factory(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
pub fn factory(load_data: LoadData, senders: ResponseSenders) {
// NB: we don't spawn a new task.
// Hypothesis: data URLs are too small for parallel base64 etc. to be worth it.
// Should be tested at some point.
// Left in separate function to allow easy moving to a task, if desired.
load(load_data, start_chan)
load(load_data, senders)
}

fn load(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
fn load(load_data: LoadData, senders: ResponseSenders) {
let url = load_data.url;
assert!("data" == url.scheme.as_slice());

let mut metadata = Metadata::default(url.clone());

let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer,
};

// Split out content type and data.
let mut scheme_data = match url.scheme_data {
SchemeData::NonRelative(scheme_data) => scheme_data,
@@ -93,10 +86,11 @@ fn assert_parse(url: &'static str,
use std::sync::mpsc::channel;
use url::Url;
use sniffer_task;
use resource_task::LoadConsumer::Channel;

let (start_chan, start_port) = channel();
let sniffer_task = sniffer_task::new_sniffer_task();
load(LoadData::new(Url::parse(url).unwrap(), start_chan), sniffer_task);
load(LoadData::new(Url::parse(url).unwrap(), Channel(start_chan)), sniffer_task);

let response = start_port.recv().unwrap();
assert_eq!(&response.metadata.content_type, &content_type);
@@ -2,7 +2,8 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use resource_task::{ProgressMsg, Metadata, LoadData, start_sending, TargetedLoadResponse, ResponseSenders};
use resource_task::{ProgressMsg, Metadata, LoadData, start_sending};
use resource_task::ResponseSenders;
use resource_task::ProgressMsg::{Payload, Done};

use std::borrow::ToOwned;
@@ -32,13 +33,9 @@ fn read_all(reader: &mut io::Stream, progress_chan: &Sender<ProgressMsg>)
}
}

pub fn factory(load_data: LoadData, start_chan: Sender<TargetedLoadResponse>) {
pub fn factory(load_data: LoadData, senders: ResponseSenders) {
let url = load_data.url;
assert!("file" == url.scheme.as_slice());
let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer,
};
let progress_chan = start_sending(senders, Metadata::default(url.clone()));
spawn_named("file_loader".to_owned(), move || {
let file_path: Result<Path, ()> = url.to_file_path();
@@ -3,7 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use cookie_storage::CookieSource;
use resource_task::{Metadata, TargetedLoadResponse, LoadData, start_sending_opt, ResponseSenders};
use resource_task::{Metadata, LoadData, start_sending_opt, ResponseSenders};
use resource_task::ControlMsg;
use resource_task::ProgressMsg::{Payload, Done};

@@ -28,9 +28,9 @@ use url::{Url, UrlParser};
use std::borrow::ToOwned;

pub fn factory(cookies_chan: Sender<ControlMsg>)
-> Box<Invoke<(LoadData, Sender<TargetedLoadResponse>)> + Send> {
box move |(load_data, start_chan)| {
spawn_named("http_loader".to_owned(), move || load(load_data, start_chan, cookies_chan))
-> Box<Invoke<(LoadData, ResponseSenders)> + Send> {
box move |(load_data, senders)| {
spawn_named("http_loader".to_owned(), move || load(load_data, senders, cookies_chan))
}
}

@@ -44,7 +44,7 @@ fn send_error(url: Url, err: String, senders: ResponseSenders) {
};
}

fn load(mut load_data: LoadData, start_chan: Sender<TargetedLoadResponse>, cookies_chan: Sender<ControlMsg>) {
fn load(mut load_data: LoadData, senders: ResponseSenders, cookies_chan: Sender<ControlMsg>) {
// FIXME: At the time of writing this FIXME, servo didn't have any central
// location for configuration. If you're reading this and such a
// repository DOES exist, please update this constant to use it.
@@ -53,11 +53,6 @@ fn load(mut load_data: LoadData, start_chan: Sender<TargetedLoadResponse>, cooki
let mut url = load_data.url.clone();
let mut redirected_to = HashSet::new();

let senders = ResponseSenders {
immediate_consumer: start_chan,
eventual_consumer: load_data.consumer
};

// Loop to handle redirects.
loop {
iters = iters + 1;
@@ -97,8 +92,8 @@ reason: \"certificate verify failed\" }]";
detail: Some(ref det)})) if det.as_slice() == ssl_err_string => {
let mut image = resources_dir_path();
image.push("badcert.html");
let load_data = LoadData::new(Url::from_file_path(&image).unwrap(), senders.eventual_consumer);
file_loader::factory(load_data, senders.immediate_consumer);
let load_data = LoadData::new(Url::from_file_path(&image).unwrap());
file_loader::factory(load_data, senders);
return;
},
Err(e) => {
@@ -6,6 +6,7 @@ use image::base::{Image, load_from_memory};
use resource_task;
use resource_task::{LoadData, ResourceTask};
use resource_task::ProgressMsg::{Payload, Done};
use resource_task::LoadConsumer::Channel;

use util::task::spawn_named;
use util::taskpool::TaskPool;
@@ -448,7 +449,7 @@ impl ImageCacheTask {

fn load_image_data(url: Url, resource_task: ResourceTask) -> Result<Vec<u8>, ()> {
let (response_chan, response_port) = channel();
resource_task.send(resource_task::ControlMsg::Load(LoadData::new(url, response_chan))).unwrap();
resource_task.send(resource_task::ControlMsg::Load(LoadData::new(url), Channel(response_chan))).unwrap();

let mut image_data = vec!();

@@ -51,17 +51,32 @@ pub fn global_init() {
}
}

pub trait AsyncResponseListener {
fn headers_available(metadata: Metadata);
fn data_available(payload: Vec<u8>);
fn response_complete(status: Result<(), String>);
}

pub trait AsyncResponseTarget {
fn get_listener(&self) -> &AsyncResponseListener;
}

pub enum ControlMsg {
/// Request the data associated with a particular URL
Load(LoadData),
Load(LoadData, LoadConsumer),
/// Store a set of cookies for a given originating URL
SetCookiesForUrl(Url, String, CookieSource),
/// Retrieve the stored cookies for a given URL
GetCookiesForUrl(Url, Sender<Option<String>>, CookieSource),
Exit
}

#[derive(Clone)]
pub enum LoadConsumer {
Channel(Sender<LoadResponse>),
Listener(Box<AsyncResponseTarget + Send>),
}

//#[derive(Clone)]
pub struct LoadData {
pub url: Url,
pub method: Method,
@@ -71,19 +86,17 @@ pub struct LoadData {
pub preserved_headers: Headers,
pub data: Option<Vec<u8>>,
pub cors: Option<ResourceCORSData>,
pub consumer: Sender<LoadResponse>,
}

impl LoadData {
pub fn new(url: Url, consumer: Sender<LoadResponse>) -> LoadData {
pub fn new(url: Url) -> LoadData {
LoadData {
url: url,
method: Method::Get,
headers: Headers::new(),
preserved_headers: Headers::new(),
data: None,
cors: None,
consumer: consumer,
}
}
}
@@ -161,8 +174,27 @@ pub struct TargetedLoadResponse {
pub consumer: Sender<LoadResponse>,
}

// Data structure containing ports
pub struct ResponseSenders {
pub enum ResponseSenders {
Channel(ChannelResponseSenders),
Listener(Box<AsyncResponseTarget + Send>),
}

impl ResponseSenders {
fn from_consumer(consumer: LoadConsumer, sniffer_task: Sender<TargetedLoadResponse>)
-> ResponseSenders {
match consumer {
LoadConsumer::Channel(c) => ResponseSenders::Channel(
ChannelResponseSenders {
immediate_consumer: sniffer_task,
eventual_consumer: c,
}),
//TODO: return a listener that chains through the sniffer
LoadConsumer::Listener(l) => ResponseSenders::Listener(l),
}
}
}

pub struct ChannelResponseSenders {
pub immediate_consumer: Sender<TargetedLoadResponse>,
pub eventual_consumer: Sender<LoadResponse>,
}
@@ -183,25 +215,30 @@ pub fn start_sending(senders: ResponseSenders, metadata: Metadata) -> Sender<Pro

/// For use by loaders in responding to a Load message.
pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
let (progress_chan, progress_port) = channel();
let result = senders.immediate_consumer.send(TargetedLoadResponse {
load_response: LoadResponse {
metadata: metadata,
progress_port: progress_port,
},
consumer: senders.eventual_consumer
});
match result {
Ok(_) => Ok(progress_chan),
Err(_) => Err(())
match senders {
ResponseSenders::Channel(senders) => {
let (progress_chan, progress_port) = channel();
let result = senders.immediate_consumer.send(TargetedLoadResponse {
load_response: LoadResponse {
metadata: metadata,
progress_port: progress_port,
},
consumer: senders.eventual_consumer
});
match result {
Ok(_) => Ok(progress_chan),
Err(_) => Err(())
}
}
ResponseSenders::Listener(_) => panic!(),
}
}

/// Convenience function for synchronously loading a whole resource.
pub fn load_whole_resource(resource_task: &ResourceTask, url: Url)
-> Result<(Metadata, Vec<u8>), String> {
let (start_chan, start_port) = channel();
resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan))).unwrap();
resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(start_chan))).unwrap();
let response = start_port.recv().unwrap();

let mut buf = vec!();
@@ -277,8 +314,8 @@ impl ResourceManager {
fn start(&mut self) {
loop {
match self.from_client.recv().unwrap() {
ControlMsg::Load(load_data) => {
self.load(load_data)
ControlMsg::Load(load_data, consumer) => {
self.load(load_data, consumer)
}
ControlMsg::SetCookiesForUrl(request, cookie_list, source) => {
let header = Header::parse_header([cookie_list.into_bytes()].as_slice());
@@ -300,26 +337,23 @@ impl ResourceManager {
}
}

fn load(&mut self, mut load_data: LoadData) {
fn load(&mut self, mut load_data: LoadData, consumer: LoadConsumer) {
unsafe {
if let Some(host_table) = HOST_TABLE {
load_data = replace_hosts(load_data, host_table);
}
}

self.user_agent.as_ref().map(|ua| load_data.headers.set(UserAgent(ua.clone())));
let senders = ResponseSenders {
immediate_consumer: self.sniffer_task.clone(),
eventual_consumer: load_data.consumer.clone(),
};

fn from_factory(factory: fn(LoadData, Sender<TargetedLoadResponse>))
-> Box<Invoke<(LoadData, Sender<TargetedLoadResponse>)> + Send> {
box move |&:(load_data, start_chan)| {
factory(load_data, start_chan)
fn from_factory(factory: fn(LoadData, ResponseSenders))
-> Box<Invoke<(LoadData, ResponseSenders)> + Send> {
box move |&:(load_data, senders)| {
factory(load_data, senders)
}
}

let senders = ResponseSenders::from_consumer(consumer, self.sniffer_task.clone());
let loader = match load_data.url.scheme.as_slice() {
"file" => from_factory(file_loader::factory),
"http" | "https" => http_loader::factory(self.resource_task.clone()),
@@ -334,14 +368,14 @@ impl ResourceManager {
};
debug!("resource_task: loading url: {}", load_data.url.serialize());

loader.invoke((load_data, self.sniffer_task.clone()));
loader.invoke((load_data, senders));
}
}

/// Load a URL asynchronously and iterate over chunks of bytes from the response.
pub fn load_bytes_iter(resource_task: &ResourceTask, url: Url) -> (Metadata, ProgressMsgPortIterator) {
let (input_chan, input_port) = channel();
resource_task.send(ControlMsg::Load(LoadData::new(url, input_chan))).unwrap();
resource_task.send(ControlMsg::Load(LoadData::new(url), LoadConsumer::Channel(input_chan))).unwrap();

let response = input_port.recv().unwrap();
let iter = ProgressMsgPortIterator { progress_port: response.progress_port };
@@ -379,7 +413,7 @@ fn test_bad_scheme() {
let resource_task = new_resource_task(None);
let (start_chan, start) = channel();
let url = Url::parse("bogus://whatever").unwrap();
resource_task.send(ControlMsg::Load(LoadData::new(url, start_chan)));
resource_task.send(ControlMsg::Load(LoadData::new(url, LoadConsumer::Channel(start_chan))));
let response = start.recv().unwrap();
match response.progress_port.recv().unwrap() {
ProgressMsg::Done(result) => { assert!(result.is_err()) }
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.