Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async fetching of extenal script sources via interruptible parsing. #5197

Closed
wants to merge 14 commits into from

Completely abstract sending responses over channels vs. listeners.

  • Loading branch information
jdm committed Mar 5, 2015
commit eacc8a973fcb493406cabf2deaafa8a8bc2a892f
@@ -2,19 +2,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use resource_task::{ProgressMsg, Metadata, LoadData, start_sending};
use resource_task::ResponseSenders;
use resource_task::{Metadata, LoadData, start_sending};
use resource_task::{ResponseSenders, ProgressSender};
use resource_task::ProgressMsg::{Payload, Done};

use std::borrow::ToOwned;
use std::old_io as io;
use std::old_io::File;
use std::sync::mpsc::Sender;
use util::task::spawn_named;

static READ_SIZE: uint = 8192;

fn read_all(reader: &mut io::Stream, progress_chan: &Sender<ProgressMsg>)
fn read_all(reader: &mut io::Stream, progress_chan: &ProgressSender)
-> Result<(), String> {
loop {
let mut buf = vec!();
@@ -52,13 +52,13 @@ pub fn global_init() {
}

pub trait AsyncResponseListener {
fn headers_available(metadata: Metadata);
fn data_available(payload: Vec<u8>);
fn response_complete(status: Result<(), String>);
fn headers_available(&self, metadata: Metadata);
fn data_available(&self, payload: Vec<u8>);
fn response_complete(&self, status: Result<(), String>);
}

pub trait AsyncResponseTarget {
fn get_listener(&self) -> &AsyncResponseListener;
fn invoke_with_listener<F>(&self, f: F) where F: FnOnce(&AsyncResponseListener);
}

pub enum ControlMsg {
@@ -199,6 +199,29 @@ pub struct ChannelResponseSenders {
pub eventual_consumer: Sender<LoadResponse>,
}

pub enum ProgressSender {
Channel(Sender<ProgressMsg>),
Listener(Box<AsyncResponseTarget + Send>),
}

impl ProgressSender {
//XXXjdm return actual error
pub fn send(&self, msg: ProgressMsg) -> Result<(), ()> {
match *self {
ProgressSender::Channel(ref c) => c.send(msg).map_err(|_| ()),
ProgressSender::Listener(ref b) => {
b.invoke_with_listener(move |listener| {
match msg {
ProgressMsg::Payload(buf) => listener.data_available(buf),
ProgressMsg::Done(status) => listener.response_complete(status),
}
});
Ok(())
}
}
}
}

/// Messages sent in response to a `Load` message
#[derive(PartialEq,Debug)]
pub enum ProgressMsg {
@@ -209,12 +232,12 @@ pub enum ProgressMsg {
}

/// For use by loaders in responding to a Load message.
pub fn start_sending(senders: ResponseSenders, metadata: Metadata) -> Sender<ProgressMsg> {
pub fn start_sending(senders: ResponseSenders, metadata: Metadata) -> ProgressSender {
start_sending_opt(senders, metadata).ok().unwrap()
}

/// For use by loaders in responding to a Load message.
pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result<Sender<ProgressMsg>, ()> {
pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result<ProgressSender, ()> {
match senders {
ResponseSenders::Channel(senders) => {
let (progress_chan, progress_port) = channel();
@@ -226,11 +249,16 @@ pub fn start_sending_opt(senders: ResponseSenders, metadata: Metadata) -> Result
consumer: senders.eventual_consumer
});
match result {
Ok(_) => Ok(progress_chan),
Ok(_) => Ok(ProgressSender::Channel(progress_chan)),
Err(_) => Err(())
}
}
ResponseSenders::Listener(_) => panic!(),
ResponseSenders::Listener(target) => {
target.invoke_with_listener(move |listener| {
listener.headers_available(metadata);
});
Ok(ProgressSender::Listener(target))
}
}
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.