Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make resource requests cancelable (issue 4974) #5826

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Prev

Add additional checks for request cancelation

  • Loading branch information
marcusklaas committed May 5, 2015
commit 8e2eb8d6cba1bc472e48e493319b22cf2bc29d28
@@ -33,17 +33,27 @@ fn read_block(reader: &mut File) -> Result<ReadStatus, String> {
}
}

fn read_all(reader: &mut File, progress_chan: &ProgressSender)
-> Result<(), String> {
// returns true on cancelation, false on natural finish
fn read_all(reader: &mut File,
progress_chan: &ProgressSender,
cancel_receiver: &CancelationListener)
-> Result<bool, String> {
loop {
if cancel_receiver.is_cancelled() {
return Ok(true);
}

match try!(read_block(reader)) {
ReadStatus::Partial(buf) => progress_chan.send(Payload(buf)).unwrap(),
ReadStatus::EOF => return Ok(()),
ReadStatus::EOF => return Ok(false),
}
}
}

pub fn factory(load_data: LoadData, senders: LoadConsumer, classifier: Arc<MIMEClassifier>, cancel_receiver: CancelationListener) {
pub fn factory(load_data: LoadData,
senders: LoadConsumer,
classifier: Arc<MIMEClassifier>,
cancel_receiver: CancelationListener) {
let url = load_data.url;
assert!(&*url.scheme == "file");
spawn_named("file_loader".to_owned(), move || {
@@ -63,10 +73,12 @@ pub fn factory(load_data: LoadData, senders: LoadConsumer, classifier: Arc<MIMEC
classifier, &buf);
progress_chan.send(Payload(buf)).unwrap();

if cancel_receiver.is_cancelled() {
let read_result = read_all(reader, &progress_chan, &cancel_receiver);
if let Ok(true) = read_result {
return;
}
(read_all(reader, &progress_chan), progress_chan)

(read_result.map(|_| ()), progress_chan)
}
Ok(ReadStatus::EOF) | Err(_) =>
(res.map(|_| ()), start_sending(senders, metadata)),
@@ -331,14 +331,23 @@ reason: \"certificate verify failed\" }]";
Some(encoding) => {
if encoding == "gzip" {
let mut response_decoding = GzDecoder::new(response).unwrap();
send_data(&mut response_decoding, start_chan, metadata, classifier);
if send_data(&mut response_decoding, start_chan, metadata,
classifier, &cancel_receiver) {
return;
}
} else if encoding == "deflate" {
let mut response_decoding = DeflateDecoder::new(response);
send_data(&mut response_decoding, start_chan, metadata, classifier);
if send_data(&mut response_decoding, start_chan, metadata,
classifier, &cancel_receiver) {
return;
}
}
},
None => {
send_data(&mut response, start_chan, metadata, classifier);
if send_data(&mut response, start_chan, metadata,
classifier, &cancel_receiver) {
return;
}
}
}

@@ -350,25 +359,34 @@ reason: \"certificate verify failed\" }]";
fn send_data<R: Read>(reader: &mut R,
start_chan: LoadConsumer,
metadata: Metadata,
classifier: Arc<MIMEClassifier>) {
classifier: Arc<MIMEClassifier>,
cancel_receiver: &CancelationListener) -> bool {
if cancel_receiver.is_cancelled() {
return true;
}

let (progress_chan, mut chunk) = {
let buf = match read_block(reader) {
Ok(ReadResult::Payload(buf)) => buf,
_ => vec!(),
};
let p = match start_sending_sniffed_opt(start_chan, metadata, classifier, &buf) {
Ok(p) => p,
_ => return
_ => return false
};
(p, buf)
};

loop {
if cancel_receiver.is_cancelled() {
return true;
}

if progress_chan.send(Payload(chunk)).is_err() {
// The send errors when the receiver is out of scope,
// which will happen if the fetch has timed out (or has been aborted)
// so we don't need to continue with the loading of the file here.
return;
return false;
}

chunk = match read_block(reader) {
@@ -377,5 +395,11 @@ fn send_data<R: Read>(reader: &mut R,
};
}

if cancel_receiver.is_cancelled() {
return true;
}

let _ = progress_chan.send(Done(Ok(())));

false
}
@@ -248,7 +248,9 @@ impl ResourceManager {
consumer.send(self.cookie_storage.cookies_for_url(&url, source)).unwrap();
}
ControlMsg::Cancel(resource_id) => {
self.resource_id_map.get(&resource_id).unwrap().send(CancelLoad).unwrap();
let _ = self.resource_id_map.get(&resource_id).map(|cancel_sender| {
cancel_sender.send(CancelLoad).unwrap()
});
},
ControlMsg::Exit => {
break
@@ -287,18 +289,15 @@ impl ResourceManager {
};
debug!("resource_task: loading url: {}", load_data.url.serialize());

let cancelation_receiver = match resource_sender {
Some(sender) => {
let resource_id = self.next_resource_id.increment();
let (cancel_sender, cancel_receiver) = channel();
let cancelation_receiver = resource_sender.map(|sender| {
let resource_id = self.next_resource_id.increment();
let (cancel_sender, cancel_receiver) = channel();

self.resource_id_map.insert(resource_id, cancel_sender);
sender.send(resource_id).unwrap();
self.resource_id_map.insert(resource_id, cancel_sender);
sender.send(resource_id).unwrap();

Some(cancel_receiver)
},
None => None
};
cancel_receiver
});

let cancelation_listener = CancelationListener::from_receiver(cancelation_receiver);

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.