diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index c25ab10cbe6e..1aabb545354f 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -23,7 +23,8 @@ use net_traits::request::{ is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window, }; use net_traits::request::{ - BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode, + BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request, + RequestMode, }; use net_traits::response::{Response, ResponseBody, ResponseType}; use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming}; @@ -641,7 +642,10 @@ fn scheme_fetch( let (body_chan, body_port) = ipc::channel().unwrap(); let _ = stream.send(BodyChunkRequest::Connect(body_chan)); let _ = stream.send(BodyChunkRequest::Chunk); - body_port.recv().ok() + match body_port.recv().ok() { + Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes), + _ => panic!("cert should be sent in a single chunk."), + } }); let data = data.as_ref().and_then(|b| { let idx = b.iter().position(|b| *b == b'&')?; diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index a8dfaecc4221..33a6e4415e35 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target} use crate::hsts::HstsList; use crate::http_cache::{CacheKey, HttpCache}; use crate::resource_thread::AuthCache; -use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use devtools_traits::{ ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, }; @@ -30,6 +30,7 @@ use http::header::{ CONTENT_TYPE, }; use http::{HeaderMap, Request as HyperRequest}; +use hyper::header::TRANSFER_ENCODING; use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode}; use hyper_serde::Serde; use ipc_channel::ipc::{self, IpcSender}; @@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem}; use net_traits::request::Origin::Origin as SpecificOrigin; use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header}; use net_traits::request::{ - BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode, + BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder, + RequestMode, }; use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin}; use net_traits::request::{ResponseTainting, ServiceWorkersMode}; @@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime}; use time::{self, Tm}; use tokio::prelude::{future, Future, Sink, Stream}; use tokio::runtime::Runtime; -use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender}; lazy_static! { pub static ref HANDLE: Mutex> = Mutex::new(Some(Runtime::new().unwrap())); @@ -405,16 +407,89 @@ fn auth_from_cache( } } +/// Messages from the IPC route to the fetch worker, +/// used to fill the body with bytes coming-in over IPC. +enum BodyChunk { + /// A chunk of bytes. + Chunk(Vec), + /// Body is done. + Done, +} + +/// The stream side of the body passed to hyper. +enum BodyStream { + /// A receiver that can be used in Body::wrap_stream, + /// for streaming the request over the network. + Chunked(TokioReceiver>), + /// A body whose bytes are buffered + /// and sent in one chunk over the network. + Buffered(Receiver), +} + +/// The sink side of the body passed to hyper, +/// used to enqueue chunks. +enum BodySink { + /// A Tokio sender used to feed chunks to the network stream. + Chunked(TokioSender>), + /// A Crossbeam sender used to send chunks to the fetch worker, + /// where they will be buffered + /// in order to ensure they are not streamed them over the network. + Buffered(Sender), +} + +impl BodySink { + pub fn transmit_bytes(&self, bytes: Vec) { + match self { + BodySink::Chunked(ref sender) => { + let sender = sender.clone(); + HANDLE + .lock() + .unwrap() + .as_mut() + .unwrap() + .spawn(sender.send(bytes).map(|_| ()).map_err(|_| ())); + }, + BodySink::Buffered(ref sender) => { + let _ = sender.send(BodyChunk::Chunk(bytes)); + }, + } + } + + pub fn close(&self) { + match self { + BodySink::Chunked(ref sender) => { + let mut sender = sender.clone(); + HANDLE + .lock() + .unwrap() + .as_mut() + .unwrap() + .spawn(future::lazy(move || { + if sender.close().is_err() { + warn!("Failed to close network request sink."); + } + Ok(()) + })); + }, + BodySink::Buffered(ref sender) => { + let _ = sender.send(BodyChunk::Done); + }, + } + } +} + fn obtain_response( client: &Client, url: &ServoUrl, method: &Method, - request_headers: &HeaderMap, + request_headers: &mut HeaderMap, body: Option>, + source_is_null: bool, pipeline_id: &Option, request_id: Option<&str>, is_xhr: bool, context: &FetchContext, + fetch_terminated: Sender, ) -> Box< dyn Future< Item = (HyperResponse, Option), @@ -435,12 +510,25 @@ fn obtain_response( .replace("}", "%7D"); let request = if let Some(chunk_requester) = body { - // TODO: If body is a stream, append `Transfer-Encoding`/`chunked`, - // see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch + let (sink, stream) = if source_is_null { + // Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch + // TODO: this should not be set for HTTP/2(currently not supported?). + request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - let (body_chan, body_port) = ipc::channel().unwrap(); + let (sender, receiver) = channel(1); + (BodySink::Chunked(sender), BodyStream::Chunked(receiver)) + } else { + // Note: Hyper seems to already buffer bytes when the request appears not stream-able, + // see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104 + // + // However since this doesn't appear documented, and we're using an ancient version, + // for now we buffer manually to ensure we don't stream requests + // to servers that might not know how to handle them. + let (sender, receiver) = unbounded(); + (BodySink::Buffered(sender), BodyStream::Buffered(receiver)) + }; - let (sender, receiver) = channel(1); + let (body_chan, body_port) = ipc::channel().unwrap(); let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan)); @@ -453,32 +541,58 @@ fn obtain_response( ROUTER.add_route( body_port.to_opaque(), Box::new(move |message| { - let bytes: Vec = message.to().unwrap(); - let chunk_requester = chunk_requester.clone(); - let sender = sender.clone(); + let bytes: Vec = match message.to().unwrap() { + BodyChunkResponse::Chunk(bytes) => bytes, + BodyChunkResponse::Done => { + // Step 3, abort these parallel steps. + let _ = fetch_terminated.send(false); + sink.close(); + return; + }, + BodyChunkResponse::Error => { + // Step 4 and/or 5. + // TODO: differentiate between the two steps, + // where step 5 requires setting an `aborted` flag on the fetch. + let _ = fetch_terminated.send(true); + sink.close(); + return; + }, + }; devtools_bytes.lock().unwrap().append(&mut bytes.clone()); - HANDLE.lock().unwrap().as_mut().unwrap().spawn( - // Step 5.1.2.2 - // Transmit a chunk over the network(and blocking until this is done). - sender - .send(bytes) - .map(move |_| { - // Step 5.1.2.3 - // Request the next chunk. - let _ = chunk_requester.send(BodyChunkRequest::Chunk); - () - }) - .map_err(|_| ()), - ); + // Step 5.1.2.2, transmit chunk over the network, + // currently implemented by sending the bytes to the fetch worker. + sink.transmit_bytes(bytes); + + // Step 5.1.2.3 + // Request the next chunk. + let _ = chunk_requester.send(BodyChunkRequest::Chunk); }), ); + let body = match stream { + BodyStream::Chunked(receiver) => Body::wrap_stream(receiver), + BodyStream::Buffered(receiver) => { + // Accumulate bytes received over IPC into a vector. + let mut body = vec![]; + loop { + match receiver.recv() { + Ok(BodyChunk::Chunk(mut bytes)) => { + body.append(&mut bytes); + }, + Ok(BodyChunk::Done) => break, + Err(_) => warn!("Failed to read all chunks from request body."), + } + } + body.into() + }, + }; + HyperRequest::builder() .method(method) .uri(encoded_url) - .body(Body::wrap_stream(receiver)) + .body(body) } else { HyperRequest::builder() .method(method) @@ -1566,16 +1680,26 @@ fn http_network_fetch( // do not. Once we support other kinds of fetches we'll need to be more fine grained here // since things like image fetches are classified differently by devtools let is_xhr = request.destination == Destination::None; + + // The receiver will receive true if there has been an error streaming the request body. + let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded(); + let response_future = obtain_response( &context.state.client, &url, &request.method, - &request.headers, - request.body.as_mut().map(|body| body.take_stream()), + &mut request.headers, + request.body.as_ref().map(|body| body.take_stream()), + request + .body + .as_ref() + .map(|body| body.source_is_null()) + .unwrap_or(false), &request.pipeline_id, request_id.as_ref().map(Deref::deref), is_xhr, context, + fetch_terminated_sender, ); let pipeline_id = request.pipeline_id; @@ -1585,6 +1709,22 @@ fn http_network_fetch( Err(error) => return Response::network_error(error), }; + // Check if there was an error while streaming the request body. + // + // It's ok to block on the receiver, + // since we're already blocking on the response future above, + // so we can be sure that the request has already been processed, + // and a message is in the channel(or soon will be). + match fetch_terminated_receiver.recv() { + Ok(true) => { + return Response::network_error(NetworkError::Internal( + "Request body streaming failed.".into(), + )); + }, + Ok(false) => {}, + Err(_) => warn!("Failed to receive confirmation request was streamed without error."), + } + if log_enabled!(log::Level::Info) { info!("{:?} response for {}", res.version(), url); for header in res.headers().iter() { diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index 0e4c401d3dc3..48f18560d545 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer; use net::resource_thread::AuthCacheEntry; use net::test::replace_host_table; use net_traits::request::{ - BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder, - RequestMode, + BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody, + RequestBuilder, RequestMode, }; use net_traits::response::{HttpsState, ResponseBody}; use net_traits::{CookieSource, NetworkError, ReferrerPolicy}; @@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec) -> RequestBody { Box::new(move |message| { let request = message.to().unwrap(); if let BodyChunkRequest::Connect(sender) = request { - let _ = sender.send(content.clone()); + let _ = sender.send(BodyChunkResponse::Chunk(content.clone())); + let _ = sender.send(BodyChunkResponse::Done); } }), ); diff --git a/components/net_traits/request.rs b/components/net_traits/request.rs index f80b82b0fca8..c15063c7a705 100644 --- a/components/net_traits/request.rs +++ b/components/net_traits/request.rs @@ -124,16 +124,33 @@ pub enum BodySource { } /// Messages used to implement +/// which are sent from script to net. +#[derive(Debug, Deserialize, Serialize)] +pub enum BodyChunkResponse { + /// A chunk of bytes. + Chunk(Vec), + /// The body is done. + Done, + /// There was an error streaming the body, + /// terminate fetch. + Error, +} + +/// Messages used to implement +/// which are sent from net to script +/// (with the exception of Done, which is sent from script to script). #[derive(Debug, Deserialize, Serialize)] pub enum BodyChunkRequest { /// Connect a fetch in `net`, with a stream of bytes from `script`. - Connect(IpcSender>), + Connect(IpcSender), /// Re-extract a new stream from the source, following a redirect. Extract(IpcReceiver), /// Ask for another chunk. Chunk, - /// Signal the stream is done. + /// Signal the stream is done(sent from script to script). Done, + /// Signal the stream has errored(sent from script to script). + Error, } /// The net component's view into @@ -173,7 +190,7 @@ impl RequestBody { } } - pub fn take_stream(&mut self) -> IpcSender { + pub fn take_stream(&self) -> IpcSender { self.chan.clone() } diff --git a/components/script/body.rs b/components/script/body.rs index ce7113f98076..b7436c3e5e07 100644 --- a/components/script/body.rs +++ b/components/script/body.rs @@ -40,7 +40,9 @@ use js::rust::wrappers::JS_ParseJSON; use js::rust::HandleValue; use js::typedarray::{ArrayBuffer, CreateWith}; use mime::{self, Mime}; -use net_traits::request::{BodyChunkRequest, BodySource as NetBodySource, RequestBody}; +use net_traits::request::{ + BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody, +}; use script_traits::serializable::BlobImpl; use std::ptr; use std::rc::Rc; @@ -49,7 +51,7 @@ use url::form_urlencoded; /// The Dom object, or ReadableStream, that is the source of a body. /// -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum BodySource { /// A ReadableStream comes with a null-source. Null, @@ -59,6 +61,14 @@ pub enum BodySource { Object, } +/// The reason to stop reading from the body. +enum StopReading { + /// The stream has errored. + Error, + /// The stream is done. + Done, +} + /// The IPC route handler /// for . /// This route runs in the script process, @@ -69,7 +79,7 @@ struct TransmitBodyConnectHandler { stream: Trusted, task_source: NetworkingTaskSource, canceller: TaskCanceller, - bytes_sender: Option>>, + bytes_sender: Option>, control_sender: IpcSender, in_memory: Option>, in_memory_done: bool, @@ -123,7 +133,14 @@ impl TransmitBodyConnectHandler { BodyChunkRequest::Chunk => body_handler.transmit_source(), // Note: this is actually sent from this process // by the TransmitBodyPromiseHandler when reading stops. - BodyChunkRequest::Done => body_handler.stop_reading(), + BodyChunkRequest::Done => { + body_handler.stop_reading(StopReading::Done); + }, + // Note: this is actually sent from this process + // by the TransmitBodyPromiseHandler when the stream errors. + BodyChunkRequest::Error => { + body_handler.stop_reading(StopReading::Error); + }, } }), ); @@ -138,7 +155,7 @@ impl TransmitBodyConnectHandler { fn transmit_source(&mut self) { if self.in_memory_done { // Step 5.1.3 - self.stop_reading(); + self.stop_reading(StopReading::Done); return; } @@ -153,29 +170,58 @@ impl TransmitBodyConnectHandler { .bytes_sender .as_ref() .expect("No bytes sender to transmit source.") - .send(bytes.clone()); + .send(BodyChunkResponse::Chunk(bytes.clone())); return; } warn!("Re-directs for file-based Blobs not supported yet."); } /// Take the IPC sender sent by `net`, so we can send body chunks with it. - fn start_reading(&mut self, sender: IpcSender>) { + /// Also the entry point to + fn start_reading(&mut self, sender: IpcSender) { self.bytes_sender = Some(sender); + + // If we're using an actual ReadableStream, acquire a reader for it. + if self.source == BodySource::Null { + let stream = self.stream.clone(); + let _ = self.task_source.queue_with_canceller( + task!(start_reading_request_body_stream: move || { + // Step 1, Let body be request’s body. + let rooted_stream = stream.root(); + + // TODO: Step 2, If body is null. + + // Step 3, get a reader for stream. + rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream."); + + // Note: this algorithm continues when the first chunk is requested by `net`. + }), + &self.canceller, + ); + } } /// Drop the IPC sender sent by `net` - fn stop_reading(&mut self) { - // Note: this should close the corresponding receiver, - // and terminate the request stream in `net`. - self.bytes_sender = None; + fn stop_reading(&mut self, reason: StopReading) { + let bytes_sender = self + .bytes_sender + .take() + .expect("Stop reading called multiple times on TransmitBodyConnectHandler."); + match reason { + StopReading::Error => { + let _ = bytes_sender.send(BodyChunkResponse::Error); + }, + StopReading::Done => { + let _ = bytes_sender.send(BodyChunkResponse::Done); + }, + } } - /// The entry point to + /// Step 4 and following of fn transmit_body_chunk(&mut self) { if self.in_memory_done { // Step 5.1.3 - self.stop_reading(); + self.stop_reading(StopReading::Done); return; } @@ -188,7 +234,7 @@ impl TransmitBodyConnectHandler { // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey. if let Some(bytes) = self.in_memory.clone() { - let _ = bytes_sender.send(bytes); + let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes)); // Mark this body as `done` so that we can stop reading in the next tick, // matching the behavior of the promise-based flow self.in_memory_done = true; @@ -197,27 +243,9 @@ impl TransmitBodyConnectHandler { let _ = self.task_source.queue_with_canceller( task!(setup_native_body_promise_handler: move || { - // Step 1, Let body be request’s body. - // - // TODO: We need the handle the body null case, - // here assuming body is something and we have the corresponding stream. let rooted_stream = stream.root(); let global = rooted_stream.global(); - // TODO: Step 2, If body is null, - // then queue a fetch task on request to process request end-of-body - // for request and abort these steps. - - // TODO: queuing those "process request ..." tasks means we also need a handle on Request here. - - // Step 3, get a reader for stream. - if rooted_stream.start_reading().is_err() { - // Note: this can happen if script starts consuming request body - // before fetch starts transmitting it. - // Not in the spec. - return; - } - // Step 4, the result of reading a chunk from body’s stream with reader. let promise = rooted_stream.read_a_chunk(); @@ -225,14 +253,19 @@ impl TransmitBodyConnectHandler { // are a combination of the promise native handler here, // and the corresponding IPC route in `component::net::http_loader`. let promise_handler = Box::new(TransmitBodyPromiseHandler { - bytes_sender, + bytes_sender: bytes_sender.clone(), stream: rooted_stream.clone(), - control_sender, + control_sender: control_sender.clone(), }); - let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {stream: rooted_stream}); + let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler { + bytes_sender, + stream: rooted_stream, + control_sender, + }); - let handler = PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler)); + let handler = + PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler)); let realm = enter_realm(&*global); let comp = InRealm::Entered(&realm); @@ -248,7 +281,7 @@ impl TransmitBodyConnectHandler { #[derive(Clone, JSTraceable, MallocSizeOf)] struct TransmitBodyPromiseHandler { #[ignore_malloc_size_of = "Channels are hard"] - bytes_sender: IpcSender>, + bytes_sender: IpcSender, stream: DomRoot, #[ignore_malloc_size_of = "Channels are hard"] control_sender: IpcSender, @@ -278,8 +311,7 @@ impl Callback for TransmitBodyPromiseHandler { Ok(chunk) => chunk, Err(_) => { // Step 5.5, the "otherwise" steps. - // TODO: terminate fetch. - let _ = self.control_sender.send(BodyChunkRequest::Done); + let _ = self.control_sender.send(BodyChunkRequest::Error); return self.stream.stop_reading(); }, }; @@ -287,7 +319,7 @@ impl Callback for TransmitBodyPromiseHandler { // Step 5.1 and 5.2, transmit chunk. // Send the chunk to the body transmitter in net::http_loader::obtain_response. // TODO: queue a fetch task on request to process request body for request. - let _ = self.bytes_sender.send(chunk); + let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk)); } } @@ -295,14 +327,18 @@ impl Callback for TransmitBodyPromiseHandler { /// . #[derive(Clone, JSTraceable, MallocSizeOf)] struct TransmitBodyPromiseRejectionHandler { + #[ignore_malloc_size_of = "Channels are hard"] + bytes_sender: IpcSender, stream: DomRoot, + #[ignore_malloc_size_of = "Channels are hard"] + control_sender: IpcSender, } impl Callback for TransmitBodyPromiseRejectionHandler { /// fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) { // Step 5.4, the "rejection" steps. - // TODO: terminate fetch. + let _ = self.control_sender.send(BodyChunkRequest::Error); return self.stream.stop_reading(); } } @@ -376,7 +412,14 @@ impl ExtractedBody { BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(), // Note: this is actually sent from this process // by the TransmitBodyPromiseHandler when reading stops. - BodyChunkRequest::Done => body_handler.stop_reading(), + BodyChunkRequest::Done => { + body_handler.stop_reading(StopReading::Done); + }, + // Note: this is actually sent from this process + // by the TransmitBodyPromiseHandler when the stream errors. + BodyChunkRequest::Error => { + body_handler.stop_reading(StopReading::Error); + }, } }), ); diff --git a/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini b/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini index 40a2537c13c3..209ec09de2bc 100644 --- a/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini +++ b/tests/wpt/metadata/fetch/api/basic/request-upload.any.js.ini @@ -1,41 +1,9 @@ [request-upload.any.html] - type: testharness [Fetch with POST with ReadableStream] expected: FAIL - [Fetch with POST with ReadableStream containing String] - expected: FAIL - - [Fetch with POST with ReadableStream containing null] - expected: FAIL - - [Fetch with POST with ReadableStream containing number] - expected: FAIL - - [Fetch with POST with ReadableStream containing ArrayBuffer] - expected: FAIL - - [Fetch with POST with ReadableStream containing Blob] - expected: FAIL - [request-upload.any.worker.html] - type: testharness [Fetch with POST with ReadableStream] expected: FAIL - [Fetch with POST with ReadableStream containing String] - expected: FAIL - - [Fetch with POST with ReadableStream containing null] - expected: FAIL - - [Fetch with POST with ReadableStream containing number] - expected: FAIL - - [Fetch with POST with ReadableStream containing ArrayBuffer] - expected: FAIL - - [Fetch with POST with ReadableStream containing Blob] - expected: FAIL -