Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch cancellation #19274

Merged
merged 6 commits into from Nov 21, 2017
@@ -57,14 +57,14 @@ impl NetworkListener {
Some(ref res_init_) => CoreResourceMsg::FetchRedirect(
self.req_init.clone(),
res_init_.clone(),
ipc_sender),
ipc_sender, None),
None => {
set_default_accept(Destination::Document, &mut listener.req_init.headers);
set_default_accept_language(&mut listener.req_init.headers);

CoreResourceMsg::Fetch(
listener.req_init.clone(),
FetchChannels::ResponseMsg(ipc_sender))
FetchChannels::ResponseMsg(ipc_sender, None))
}
};

@@ -15,6 +15,7 @@ use hyper::header::{Header, HeaderFormat, HeaderView, Headers, Referer as Refere
use hyper::method::Method;
use hyper::mime::{Mime, SubLevel, TopLevel};
use hyper::status::StatusCode;
use ipc_channel::ipc::IpcReceiver;
use mime_guess::guess_mime_type;
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy};
use net_traits::request::{CredentialsMode, Destination, Referrer, Request, RequestMode};
@@ -27,7 +28,7 @@ use std::fs::File;
use std::io::Read;
use std::mem;
use std::str;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver};
use subresource_integrity::is_response_integrity_valid;

@@ -36,15 +37,45 @@ pub type Target<'a> = &'a mut (FetchTaskTarget + Send);
pub enum Data {
Payload(Vec<u8>),
Done,
Cancelled,
}

pub struct FetchContext {
pub state: Arc<HttpState>,
pub user_agent: Cow<'static, str>,
pub devtools_chan: Option<Sender<DevtoolsControlMsg>>,
pub filemanager: FileManager,
pub cancellation_listener: Arc<Mutex<CancellationListener>>,
}

pub struct CancellationListener {
cancel_chan: Option<IpcReceiver<()>>,
cancelled: bool,
}

impl CancellationListener {
pub fn new(cancel_chan: Option<IpcReceiver<()>>) -> Self {
Self {
cancel_chan: cancel_chan,
cancelled: false,
}
}

pub fn cancelled(&mut self) -> bool {
if let Some(ref cancel_chan) = self.cancel_chan {
if self.cancelled {
true
} else if cancel_chan.try_recv().is_ok() {
self.cancelled = true;
true
} else {
false
}
} else {
false
}
}
}
pub type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;

/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
@@ -317,17 +348,17 @@ pub fn main_fetch(request: &mut Request,
};

// Execute deferred rebinding of response.
let response = if let Some(error) = internal_error {
let mut response = if let Some(error) = internal_error {
Response::network_error(error)
} else {
response
};

// Step 19.
let mut response_loaded = false;
let response = if !response.is_network_error() && !request.integrity_metadata.is_empty() {
let mut response = if !response.is_network_error() && !request.integrity_metadata.is_empty() {
// Step 19.1.
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
response_loaded = true;

// Step 19.2.
@@ -346,9 +377,9 @@ pub fn main_fetch(request: &mut Request,
if request.synchronous {
// process_response is not supposed to be used
// by sync fetch, but we overload it here for simplicity
target.process_response(&response);
target.process_response(&mut response);
if !response_loaded {
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
}
// overloaded similarly to process_response
target.process_response_eof(&response);
@@ -370,7 +401,7 @@ pub fn main_fetch(request: &mut Request,

// Step 23.
if !response_loaded {
wait_for_response(&response, target, done_chan);
wait_for_response(&mut response, target, done_chan);
}

// Step 24.
@@ -381,7 +412,7 @@ pub fn main_fetch(request: &mut Request,
response
}

fn wait_for_response(response: &Response, target: Target, done_chan: &mut DoneChannel) {
fn wait_for_response(response: &mut Response, target: Target, done_chan: &mut DoneChannel) {
if let Some(ref ch) = *done_chan {
loop {
match ch.1.recv()
@@ -390,6 +421,10 @@ fn wait_for_response(response: &Response, target: Target, done_chan: &mut DoneCh
target.process_response_chunk(vec);
},
Data::Done => break,
Data::Cancelled => {
response.aborted = true;
break;
}
}
}
} else {
@@ -1087,6 +1087,10 @@ fn http_network_fetch(request: &Request,
let devtools_sender = context.devtools_chan.clone();
let meta_status = meta.status.clone();
let meta_headers = meta.headers.clone();
let cancellation_listener = context.cancellation_listener.clone();
if cancellation_listener.lock().unwrap().cancelled() {
return Response::network_error(NetworkError::Internal("Fetch aborted".into()))
}
thread::Builder::new().name(format!("fetch worker thread")).spawn(move || {
match StreamedResponse::from_http_response(res) {
Ok(mut res) => {
@@ -1109,6 +1113,11 @@ fn http_network_fetch(request: &Request,
}

loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);

This comment has been minimized.

Copy link
@gterzian

gterzian Nov 18, 2017

Member

In the context of #18676 I think that at this point the response would have been cached already, and such a cancellation would result in any subsequent request for the same url getting the empty cancelled response from the cache.

One way to solve this would be for the cache to disregard any cached responses that would have been cancelled. Checking for an empty response body might be one way to do so, perhaps a clearer way would be to use ResponseBody::Empty here instead, or even introduce a new ResponseBody::Cancelled?

This comment has been minimized.

Copy link
@gterzian

gterzian Nov 18, 2017

Member

Having had a look at the spec, the following can be considered:

  1. At step 5 above, if at that point the fetch has already been aborted/terminated, you could return a network error.
  2. If the fetch is terminated/aborted while the fetch worker is already performing step 12, in addition to what is done here and instead of what I suggested earlier, you could set a new aborted flag on the response. The cache could then also ignore any such aborted responses that would be held in it, when asked to construct a response. The spec mentions such a flag, see https://fetch.spec.whatwg.org/#concept-response-aborted, and it doesn't look like it has been implemented yet.

This comment has been minimized.

Copy link
@jdm

jdm Nov 20, 2017

Member

Let's add the flag to the response and set that here, and use the network error for prior to this point like gterzian suggested.

let _ = done_sender.send(Data::Cancelled);
return;
}
match read_block(&mut res) {
Ok(Data::Payload(chunk)) => {
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
@@ -1128,6 +1137,7 @@ fn http_network_fetch(request: &Request,
let _ = done_sender.send(Data::Done);
break;
}
Ok(Data::Cancelled) => unreachable!() // read_block doesn't return Data::Cancelled
}
}
}
@@ -9,7 +9,7 @@ use cookie_rs;
use cookie_storage::CookieStorage;
use devtools_traits::DevtoolsControlMsg;
use fetch::cors_cache::CorsCache;
use fetch::methods::{FetchContext, fetch};
use fetch::methods::{CancellationListener, FetchContext, fetch};
use filemanager_thread::{FileManager, TFDProvider};
use hsts::HstsList;
use http_loader::{HttpState, http_redirect_fetch};
@@ -35,7 +35,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::thread;
use storage_thread::StorageThreadFactory;
@@ -157,14 +157,14 @@ impl ResourceChannelManager {
match msg {
CoreResourceMsg::Fetch(req_init, channels) => {
match channels {
FetchChannels::ResponseMsg(sender) =>
self.resource_manager.fetch(req_init, None, sender, http_state),
FetchChannels::ResponseMsg(sender, cancel_chan) =>
self.resource_manager.fetch(req_init, None, sender, http_state, cancel_chan),
FetchChannels::WebSocket { event_sender, action_receiver } =>
self.resource_manager.websocket_connect(req_init, event_sender, action_receiver, http_state),
}
}
CoreResourceMsg::FetchRedirect(req_init, res_init, sender) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state),
CoreResourceMsg::FetchRedirect(req_init, res_init, sender, cancel_chan) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state, cancel_chan),
CoreResourceMsg::SetCookieForUrl(request, cookie, source) =>
self.resource_manager.set_cookie_for_url(&request, cookie.into_inner(), source, http_state),
CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
@@ -329,7 +329,8 @@ impl CoreResourceManager {
req_init: RequestInit,
res_init_: Option<ResponseInit>,
mut sender: IpcSender<FetchResponseMsg>,
http_state: &Arc<HttpState>) {
http_state: &Arc<HttpState>,
cancel_chan: Option<IpcReceiver<()>>) {
let http_state = http_state.clone();
let ua = self.user_agent.clone();
let dc = self.devtools_chan.clone();
@@ -346,6 +347,7 @@ impl CoreResourceManager {
user_agent: ua,
devtools_chan: dc,
filemanager: filemanager,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))),
};

match res_init_ {
@@ -342,7 +342,7 @@ pub enum WebSocketNetworkEvent {
#[derive(Deserialize, Serialize)]
/// IPC channels to communicate with the script thread about network or DOM events.
pub enum FetchChannels {
ResponseMsg(IpcSender<FetchResponseMsg>),
ResponseMsg(IpcSender<FetchResponseMsg>, /* cancel_chan */ Option<IpcReceiver<()>>),
WebSocket {
event_sender: IpcSender<WebSocketNetworkEvent>,
action_receiver: IpcReceiver<WebSocketDomAction>,
@@ -353,7 +353,7 @@ pub enum FetchChannels {
pub enum CoreResourceMsg {
Fetch(RequestInit, FetchChannels),
/// Initiate a fetch in response to processing a redirection
FetchRedirect(RequestInit, ResponseInit, IpcSender<FetchResponseMsg>),
FetchRedirect(RequestInit, ResponseInit, IpcSender<FetchResponseMsg>, /* cancel_chan */ Option<IpcReceiver<()>>),
/// Store a cookie for a given originating URL
SetCookieForUrl(ServoUrl, Serde<Cookie<'static>>, CookieSource),
/// Store a set of cookies for a given originating URL
@@ -383,7 +383,7 @@ pub fn fetch_async<F>(request: RequestInit, core_resource_thread: &CoreResourceT
ROUTER.add_route(action_receiver.to_opaque(),
Box::new(move |message| f(message.to().unwrap())));
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();
}

#[derive(Clone, Deserialize, MallocSizeOf, Serialize)]
@@ -478,7 +478,7 @@ pub fn load_whole_resource(request: RequestInit,
-> Result<(Metadata, Vec<u8>), NetworkError> {
let (action_sender, action_receiver) = ipc::channel().unwrap();
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();

let mut buf = vec![];
let mut metadata = None;
@@ -112,6 +112,8 @@ pub struct Response {
pub internal_response: Option<Box<Response>>,
/// whether or not to try to return the internal_response when asked for actual_response
pub return_internal: bool,
/// https://fetch.spec.whatwg.org/#concept-response-aborted
pub aborted: bool,
}

impl Response {
@@ -133,6 +135,7 @@ impl Response {
location_url: None,
internal_response: None,
return_internal: true,
aborted: false,
}
}

@@ -162,6 +165,7 @@ impl Response {
location_url: None,
internal_response: None,
return_internal: true,
aborted: false,
}
}

@@ -126,7 +126,7 @@ impl DocumentLoader {
request: RequestInit,
fetch_target: IpcSender<FetchResponseMsg>) {
self.resource_threads.sender().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(fetch_target))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(fetch_target, None))).unwrap();
}

/// Mark an in-progress network request complete.
@@ -491,7 +491,7 @@ impl EventSource {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();
// Step 13
Ok(ev)
}
@@ -555,6 +555,6 @@ impl EventSourceTimeoutCallback {
}
// Step 5.4
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(self.action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(self.action_sender, None))).unwrap();
}
}
@@ -154,6 +154,8 @@ pub struct XMLHttpRequest {
response_status: Cell<Result<(), ()>>,
referrer_url: Option<ServoUrl>,
referrer_policy: Option<ReferrerPolicy>,
#[ignore_malloc_size_of = "channels are hard"]
cancellation_chan: DomRefCell<Option<ipc::IpcSender<()>>>,
}

impl XMLHttpRequest {
@@ -198,6 +200,7 @@ impl XMLHttpRequest {
response_status: Cell::new(Ok(())),
referrer_url: referrer_url,
referrer_policy: referrer_policy,
cancellation_chan: DomRefCell::new(None),
}
}
pub fn new(global: &GlobalScope) -> DomRoot<XMLHttpRequest> {
@@ -218,7 +221,8 @@ impl XMLHttpRequest {
fn initiate_async_xhr(context: Arc<Mutex<XHRContext>>,
task_source: NetworkingTaskSource,
global: &GlobalScope,
init: RequestInit) {
init: RequestInit,
cancellation_chan: ipc::IpcReceiver<()>) {
impl FetchResponseListener for XHRContext {
fn process_request_body(&mut self) {
// todo
@@ -255,6 +259,7 @@ impl XMLHttpRequest {
}

let (action_sender, action_receiver) = ipc::channel().unwrap();

let listener = NetworkListener {
context: context,
task_source: task_source,
@@ -264,7 +269,7 @@ impl XMLHttpRequest {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(
Fetch(init, FetchChannels::ResponseMsg(action_sender))).unwrap();
Fetch(init, FetchChannels::ResponseMsg(action_sender, Some(cancellation_chan)))).unwrap();
}
}

@@ -1018,6 +1023,12 @@ impl XMLHttpRequest {
}

fn terminate_ongoing_fetch(&self) {
if let Some(ref cancel_chan) = *self.cancellation_chan.borrow() {
// The receiver will be destroyed if the request has already completed;
// so we throw away the error. Cancellation is a courtesy call,
// we don't actually care if the other side heard.
let _ = cancel_chan.send(());
}
let GenerationId(prev_id) = self.generation_id.get();
self.generation_id.set(GenerationId(prev_id + 1));
self.response_status.set(Ok(()));
@@ -1311,8 +1322,11 @@ impl XMLHttpRequest {
(global.networking_task_source(), None)
};

let (cancel_sender, cancel_receiver) = ipc::channel().unwrap();
*self.cancellation_chan.borrow_mut() = Some(cancel_sender);

XMLHttpRequest::initiate_async_xhr(context.clone(), task_source,
global, init);
global, init, cancel_receiver);

if let Some(script_port) = script_port {
loop {
@@ -108,7 +108,7 @@ pub fn Fetch(global: &GlobalScope, input: RequestInfo, init: RootedTraceableBox<
listener.notify_fetch(message.to().unwrap());
}));
core_resource_thread.send(
NetTraitsFetch(request_init, FetchChannels::ResponseMsg(action_sender))).unwrap();
NetTraitsFetch(request_init, FetchChannels::ResponseMsg(action_sender, None))).unwrap();

promise
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.