Skip to content

Commit

Permalink
Auto merge of #19274 - Manishearth:xhr-cancel, r=<try>
Browse files Browse the repository at this point in the history
Fetch cancellation

This PR implements cancellation for fetch, and uses it for XHR. This means that fetch clients can now send a message to the fetch task asking for the network request to be aborted.

Previously, clients like XHR had abort functionality but would implement it by simply ignoring future messages from the network task; and would not actually cancel the network fetch.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/19274)
<!-- Reviewable:end -->
  • Loading branch information
bors-servo committed Nov 18, 2017
2 parents 7b886b4 + 5b2b68d commit a82585d
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 24 deletions.
4 changes: 2 additions & 2 deletions components/constellation/network_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ impl NetworkListener {
Some(ref res_init_) => CoreResourceMsg::FetchRedirect(
self.req_init.clone(),
res_init_.clone(),
ipc_sender),
ipc_sender, None),
None => {
set_default_accept(Destination::Document, &mut listener.req_init.headers);
set_default_accept_language(&mut listener.req_init.headers);

CoreResourceMsg::Fetch(
listener.req_init.clone(),
FetchChannels::ResponseMsg(ipc_sender))
FetchChannels::ResponseMsg(ipc_sender, None))
}
};

Expand Down
32 changes: 31 additions & 1 deletion components/net/fetch/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use hyper::header::{Header, HeaderFormat, HeaderView, Headers, Referer as Refere
use hyper::method::Method;
use hyper::mime::{Mime, SubLevel, TopLevel};
use hyper::status::StatusCode;
use ipc_channel::ipc::IpcReceiver;
use mime_guess::guess_mime_type;
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy};
use net_traits::request::{CredentialsMode, Destination, Referrer, Request, RequestMode};
Expand All @@ -27,7 +28,7 @@ use std::fs::File;
use std::io::Read;
use std::mem;
use std::str;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver};
use subresource_integrity::is_response_integrity_valid;

Expand All @@ -43,8 +44,37 @@ pub struct FetchContext {
pub user_agent: Cow<'static, str>,
pub devtools_chan: Option<Sender<DevtoolsControlMsg>>,
pub filemanager: FileManager,
pub cancellation_listener: Arc<Mutex<CancellationListener>>,
}

pub struct CancellationListener {
cancel_chan: Option<IpcReceiver<()>>,
cancelled: bool,
}

impl CancellationListener {
pub fn new(cancel_chan: Option<IpcReceiver<()>>) -> Self {
Self {
cancel_chan: cancel_chan,
cancelled: false,
}
}

pub fn cancelled(&mut self) -> bool {
if let Some(ref cancel_chan) = self.cancel_chan {
if self.cancelled {
true
} else if let Ok(_) = cancel_chan.try_recv() {
self.cancelled = true;
true
} else {
false
}
} else {
false
}
}
}
pub type DoneChannel = Option<(Sender<Data>, Receiver<Data>)>;

/// [Fetch](https://fetch.spec.whatwg.org#concept-fetch)
Expand Down
6 changes: 6 additions & 0 deletions components/net/http_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ fn http_network_fetch(request: &Request,
let devtools_sender = context.devtools_chan.clone();
let meta_status = meta.status.clone();
let meta_headers = meta.headers.clone();
let cancellation_listener = context.cancellation_listener.clone();
thread::Builder::new().name(format!("fetch worker thread")).spawn(move || {
match StreamedResponse::from_http_response(res) {
Ok(mut res) => {
Expand All @@ -1109,6 +1110,11 @@ fn http_network_fetch(request: &Request,
}

loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Done);
return;
}
match read_block(&mut res) {
Ok(Data::Payload(chunk)) => {
if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() {
Expand Down
16 changes: 9 additions & 7 deletions components/net/resource_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cookie_rs;
use cookie_storage::CookieStorage;
use devtools_traits::DevtoolsControlMsg;
use fetch::cors_cache::CorsCache;
use fetch::methods::{FetchContext, fetch};
use fetch::methods::{CancellationListener, FetchContext, fetch};
use filemanager_thread::{FileManager, TFDProvider};
use hsts::HstsList;
use http_loader::{HttpState, http_redirect_fetch};
Expand All @@ -35,7 +35,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::thread;
use storage_thread::StorageThreadFactory;
Expand Down Expand Up @@ -157,14 +157,14 @@ impl ResourceChannelManager {
match msg {
CoreResourceMsg::Fetch(req_init, channels) => {
match channels {
FetchChannels::ResponseMsg(sender) =>
self.resource_manager.fetch(req_init, None, sender, http_state),
FetchChannels::ResponseMsg(sender, cancel_chan) =>
self.resource_manager.fetch(req_init, None, sender, http_state, cancel_chan),
FetchChannels::WebSocket { event_sender, action_receiver } =>
self.resource_manager.websocket_connect(req_init, event_sender, action_receiver, http_state),
}
}
CoreResourceMsg::FetchRedirect(req_init, res_init, sender) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state),
CoreResourceMsg::FetchRedirect(req_init, res_init, sender, cancel_chan) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state, cancel_chan),
CoreResourceMsg::SetCookieForUrl(request, cookie, source) =>
self.resource_manager.set_cookie_for_url(&request, cookie.into_inner(), source, http_state),
CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
Expand Down Expand Up @@ -329,7 +329,8 @@ impl CoreResourceManager {
req_init: RequestInit,
res_init_: Option<ResponseInit>,
mut sender: IpcSender<FetchResponseMsg>,
http_state: &Arc<HttpState>) {
http_state: &Arc<HttpState>,
cancel_chan: Option<IpcReceiver<()>>) {
let http_state = http_state.clone();
let ua = self.user_agent.clone();
let dc = self.devtools_chan.clone();
Expand All @@ -346,6 +347,7 @@ impl CoreResourceManager {
user_agent: ua,
devtools_chan: dc,
filemanager: filemanager,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))),
};

match res_init_ {
Expand Down
8 changes: 4 additions & 4 deletions components/net_traits/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub enum WebSocketNetworkEvent {
#[derive(Deserialize, Serialize)]
/// IPC channels to communicate with the script thread about network or DOM events.
pub enum FetchChannels {
ResponseMsg(IpcSender<FetchResponseMsg>),
ResponseMsg(IpcSender<FetchResponseMsg>, /* cancel_chan */ Option<IpcReceiver<()>>),
WebSocket {
event_sender: IpcSender<WebSocketNetworkEvent>,
action_receiver: IpcReceiver<WebSocketDomAction>,
Expand All @@ -353,7 +353,7 @@ pub enum FetchChannels {
pub enum CoreResourceMsg {
Fetch(RequestInit, FetchChannels),
/// Initiate a fetch in response to processing a redirection
FetchRedirect(RequestInit, ResponseInit, IpcSender<FetchResponseMsg>),
FetchRedirect(RequestInit, ResponseInit, IpcSender<FetchResponseMsg>, /* cancel_chan */ Option<IpcReceiver<()>>),
/// Store a cookie for a given originating URL
SetCookieForUrl(ServoUrl, Serde<Cookie<'static>>, CookieSource),
/// Store a set of cookies for a given originating URL
Expand Down Expand Up @@ -383,7 +383,7 @@ pub fn fetch_async<F>(request: RequestInit, core_resource_thread: &CoreResourceT
ROUTER.add_route(action_receiver.to_opaque(),
Box::new(move |message| f(message.to().unwrap())));
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();
}

#[derive(Clone, Deserialize, MallocSizeOf, Serialize)]
Expand Down Expand Up @@ -478,7 +478,7 @@ pub fn load_whole_resource(request: RequestInit,
-> Result<(Metadata, Vec<u8>), NetworkError> {
let (action_sender, action_receiver) = ipc::channel().unwrap();
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();

let mut buf = vec![];
let mut metadata = None;
Expand Down
2 changes: 1 addition & 1 deletion components/script/document_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DocumentLoader {
request: RequestInit,
fetch_target: IpcSender<FetchResponseMsg>) {
self.resource_threads.sender().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(fetch_target))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(fetch_target, None))).unwrap();
}

/// Mark an in-progress network request complete.
Expand Down
4 changes: 2 additions & 2 deletions components/script/dom/eventsource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl EventSource {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender, None))).unwrap();
// Step 13
Ok(ev)
}
Expand Down Expand Up @@ -555,6 +555,6 @@ impl EventSourceTimeoutCallback {
}
// Step 5.4
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(self.action_sender))).unwrap();
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(self.action_sender, None))).unwrap();
}
}
20 changes: 17 additions & 3 deletions components/script/dom/xmlhttprequest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub struct XMLHttpRequest {
response_status: Cell<Result<(), ()>>,
referrer_url: Option<ServoUrl>,
referrer_policy: Option<ReferrerPolicy>,
#[ignore_malloc_size_of = "channels are hard"]
cancellation_chan: DomRefCell<Option<ipc::IpcSender<()>>>,
}

impl XMLHttpRequest {
Expand Down Expand Up @@ -198,6 +200,7 @@ impl XMLHttpRequest {
response_status: Cell::new(Ok(())),
referrer_url: referrer_url,
referrer_policy: referrer_policy,
cancellation_chan: DomRefCell::new(None),
}
}
pub fn new(global: &GlobalScope) -> DomRoot<XMLHttpRequest> {
Expand All @@ -218,7 +221,8 @@ impl XMLHttpRequest {
fn initiate_async_xhr(context: Arc<Mutex<XHRContext>>,
task_source: NetworkingTaskSource,
global: &GlobalScope,
init: RequestInit) {
init: RequestInit,
cancellation_chan: ipc::IpcReceiver<()>) {
impl FetchResponseListener for XHRContext {
fn process_request_body(&mut self) {
// todo
Expand Down Expand Up @@ -255,6 +259,7 @@ impl XMLHttpRequest {
}

let (action_sender, action_receiver) = ipc::channel().unwrap();

let listener = NetworkListener {
context: context,
task_source: task_source,
Expand All @@ -264,7 +269,7 @@ impl XMLHttpRequest {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(
Fetch(init, FetchChannels::ResponseMsg(action_sender))).unwrap();
Fetch(init, FetchChannels::ResponseMsg(action_sender, Some(cancellation_chan)))).unwrap();
}
}

Expand Down Expand Up @@ -1018,6 +1023,12 @@ impl XMLHttpRequest {
}

fn terminate_ongoing_fetch(&self) {
if let Some(ref cancel_chan) = *self.cancellation_chan.borrow() {
// The receiver will be destroyed if the request has already completed;
// so we throw away the error. Cancellation is a courtesy call,
// we don't actually care if the other side heard.
let _ = cancel_chan.send(());
}
let GenerationId(prev_id) = self.generation_id.get();
self.generation_id.set(GenerationId(prev_id + 1));
self.response_status.set(Ok(()));
Expand Down Expand Up @@ -1311,8 +1322,11 @@ impl XMLHttpRequest {
(global.networking_task_source(), None)
};

let (cancel_sender, cancel_receiver) = ipc::channel().unwrap();
*self.cancellation_chan.borrow_mut() = Some(cancel_sender);

XMLHttpRequest::initiate_async_xhr(context.clone(), task_source,
global, init);
global, init, cancel_receiver);

if let Some(script_port) = script_port {
loop {
Expand Down
2 changes: 1 addition & 1 deletion components/script/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn Fetch(global: &GlobalScope, input: RequestInfo, init: RootedTraceableBox<
listener.notify_fetch(message.to().unwrap());
}));
core_resource_thread.send(
NetTraitsFetch(request_init, FetchChannels::ResponseMsg(action_sender))).unwrap();
NetTraitsFetch(request_init, FetchChannels::ResponseMsg(action_sender, None))).unwrap();

promise
}
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/net/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use hyper_openssl;
use msg::constellation_msg::TEST_PIPELINE_ID;
use net::connector::create_ssl_client;
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::FetchContext;
use net::fetch::methods::{CancellationListener, FetchContext};
use net::filemanager_thread::FileManager;
use net::hsts::HstsEntry;
use net::test::HttpState;
Expand Down Expand Up @@ -538,6 +538,7 @@ fn test_fetch_with_hsts() {
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: None,
filemanager: FileManager::new(),
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
};

{
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/net/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ use devtools_traits::DevtoolsControlMsg;
use hyper::server::{Handler, Listening, Server};
use net::connector::create_ssl_client;
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::{self, FetchContext};
use net::fetch::methods::{self, CancellationListener, FetchContext};
use net::filemanager_thread::FileManager;
use net::test::HttpState;
use net_traits::FetchTaskTarget;
use net_traits::request::Request;
use net_traits::response::Response;
use servo_config::resource_files::resources_dir_path;
use servo_url::ServoUrl;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, channel};

const DEFAULT_USER_AGENT: &'static str = "Such Browser. Very Layout. Wow.";
Expand All @@ -61,6 +61,7 @@ fn new_fetch_context(dc: Option<Sender<DevtoolsControlMsg>>) -> FetchContext {
user_agent: DEFAULT_USER_AGENT.into(),
devtools_chan: dc,
filemanager: FileManager::new(),
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
}
}
impl FetchTaskTarget for FetchResponseCollector {
Expand Down

0 comments on commit a82585d

Please sign in to comment.