Skip to content

Commit

Permalink
fix streaming request bodies, terminate fetch if the body stream errors
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jun 16, 2020
1 parent 581ade5 commit 719b395
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 109 deletions.
8 changes: 6 additions & 2 deletions components/net/fetch/methods.rs
Expand Up @@ -23,7 +23,8 @@ use net_traits::request::{
is_cors_safelisted_method, is_cors_safelisted_request_header, Origin, ResponseTainting, Window,
};
use net_traits::request::{
BodyChunkRequest, CredentialsMode, Destination, Referrer, Request, RequestMode,
BodyChunkRequest, BodyChunkResponse, CredentialsMode, Destination, Referrer, Request,
RequestMode,
};
use net_traits::response::{Response, ResponseBody, ResponseType};
use net_traits::{FetchTaskTarget, NetworkError, ReferrerPolicy, ResourceFetchTiming};
Expand Down Expand Up @@ -641,7 +642,10 @@ fn scheme_fetch(
let (body_chan, body_port) = ipc::channel().unwrap();
let _ = stream.send(BodyChunkRequest::Connect(body_chan));
let _ = stream.send(BodyChunkRequest::Chunk);
body_port.recv().ok()
match body_port.recv().ok() {
Some(BodyChunkResponse::Chunk(bytes)) => Some(bytes),
_ => panic!("cert should be sent in a single chunk."),
}
});
let data = data.as_ref().and_then(|b| {
let idx = b.iter().position(|b| *b == b'&')?;
Expand Down
194 changes: 167 additions & 27 deletions components/net/http_loader.rs
Expand Up @@ -11,7 +11,7 @@ use crate::fetch::methods::{main_fetch, Data, DoneChannel, FetchContext, Target}
use crate::hsts::HstsList;
use crate::http_cache::{CacheKey, HttpCache};
use crate::resource_thread::AuthCache;
use crossbeam_channel::{unbounded, Sender};
use crossbeam_channel::{unbounded, Receiver, Sender};
use devtools_traits::{
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
};
Expand All @@ -30,6 +30,7 @@ use http::header::{
CONTENT_TYPE,
};
use http::{HeaderMap, Request as HyperRequest};
use hyper::header::TRANSFER_ENCODING;
use hyper::{Body, Client, Method, Response as HyperResponse, StatusCode};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcSender};
Expand All @@ -40,7 +41,8 @@ use net_traits::quality::{quality_to_value, Quality, QualityItem};
use net_traits::request::Origin::Origin as SpecificOrigin;
use net_traits::request::{is_cors_safelisted_method, is_cors_safelisted_request_header};
use net_traits::request::{
BodyChunkRequest, RedirectMode, Referrer, Request, RequestBuilder, RequestMode,
BodyChunkRequest, BodyChunkResponse, RedirectMode, Referrer, Request, RequestBuilder,
RequestMode,
};
use net_traits::request::{CacheMode, CredentialsMode, Destination, Origin};
use net_traits::request::{ResponseTainting, ServiceWorkersMode};
Expand All @@ -61,7 +63,7 @@ use std::time::{Duration, SystemTime};
use time::{self, Tm};
use tokio::prelude::{future, Future, Sink, Stream};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{channel, Receiver as TokioReceiver, Sender as TokioSender};

lazy_static! {
pub static ref HANDLE: Mutex<Option<Runtime>> = Mutex::new(Some(Runtime::new().unwrap()));
Expand Down Expand Up @@ -405,16 +407,89 @@ fn auth_from_cache(
}
}

/// Messages from the IPC route to the fetch worker,
/// used to fill the body with bytes coming-in over IPC.
enum BodyChunk {
/// A chunk of bytes.
Chunk(Vec<u8>),
/// Body is done.
Done,
}

/// The stream side of the body passed to hyper.
enum BodyStream {
/// A receiver that can be used in Body::wrap_stream,
/// for streaming the request over the network.
Chunked(TokioReceiver<Vec<u8>>),
/// A body whose bytes are buffered
/// and sent in one chunk over the network.
Buffered(Receiver<BodyChunk>),
}

/// The sink side of the body passed to hyper,
/// used to enqueue chunks.
enum BodySink {
/// A Tokio sender used to feed chunks to the network stream.
Chunked(TokioSender<Vec<u8>>),
/// A Crossbeam sender used to send chunks to the fetch worker,
/// where they will be buffered
/// in order to ensure they are not streamed them over the network.
Buffered(Sender<BodyChunk>),
}

impl BodySink {
pub fn transmit_bytes(&self, bytes: Vec<u8>) {
match self {
BodySink::Chunked(ref sender) => {
let sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(sender.send(bytes).map(|_| ()).map_err(|_| ()));
},
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Chunk(bytes));
},
}
}

pub fn close(&self) {
match self {
BodySink::Chunked(ref sender) => {
let mut sender = sender.clone();
HANDLE
.lock()
.unwrap()
.as_mut()
.unwrap()
.spawn(future::lazy(move || {
if sender.close().is_err() {
warn!("Failed to close network request sink.");
}
Ok(())
}));
},
BodySink::Buffered(ref sender) => {
let _ = sender.send(BodyChunk::Done);
},
}
}
}

fn obtain_response(
client: &Client<Connector, Body>,
url: &ServoUrl,
method: &Method,
request_headers: &HeaderMap,
request_headers: &mut HeaderMap,
body: Option<IpcSender<BodyChunkRequest>>,
source_is_null: bool,
pipeline_id: &Option<PipelineId>,
request_id: Option<&str>,
is_xhr: bool,
context: &FetchContext,
fetch_terminated: Sender<bool>,
) -> Box<
dyn Future<
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
Expand All @@ -435,12 +510,25 @@ fn obtain_response(
.replace("}", "%7D");

let request = if let Some(chunk_requester) = body {
// TODO: If body is a stream, append `Transfer-Encoding`/`chunked`,
// see step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
let (sink, stream) = if source_is_null {
// Step 4.2 of https://fetch.spec.whatwg.org/#concept-http-network-fetch
// TODO: this should not be set for HTTP/2(currently not supported?).
request_headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));

let (body_chan, body_port) = ipc::channel().unwrap();
let (sender, receiver) = channel(1);
(BodySink::Chunked(sender), BodyStream::Chunked(receiver))
} else {
// Note: Hyper seems to already buffer bytes when the request appears not stream-able,
// see https://github.com/hyperium/hyper/issues/2232#issuecomment-644322104
//
// However since this doesn't appear documented, and we're using an ancient version,
// for now we buffer manually to ensure we don't stream requests
// to servers that might not know how to handle them.
let (sender, receiver) = unbounded();
(BodySink::Buffered(sender), BodyStream::Buffered(receiver))
};

let (sender, receiver) = channel(1);
let (body_chan, body_port) = ipc::channel().unwrap();

let _ = chunk_requester.send(BodyChunkRequest::Connect(body_chan));

Expand All @@ -453,32 +541,58 @@ fn obtain_response(
ROUTER.add_route(
body_port.to_opaque(),
Box::new(move |message| {
let bytes: Vec<u8> = message.to().unwrap();
let chunk_requester = chunk_requester.clone();
let sender = sender.clone();
let bytes: Vec<u8> = match message.to().unwrap() {
BodyChunkResponse::Chunk(bytes) => bytes,
BodyChunkResponse::Done => {
// Step 3, abort these parallel steps.
let _ = fetch_terminated.send(false);
sink.close();
return;
},
BodyChunkResponse::Error => {
// Step 4 and/or 5.
// TODO: differentiate between the two steps,
// where step 5 requires setting an `aborted` flag on the fetch.
let _ = fetch_terminated.send(true);
sink.close();
return;
},
};

devtools_bytes.lock().unwrap().append(&mut bytes.clone());

HANDLE.lock().unwrap().as_mut().unwrap().spawn(
// Step 5.1.2.2
// Transmit a chunk over the network(and blocking until this is done).
sender
.send(bytes)
.map(move |_| {
// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
()
})
.map_err(|_| ()),
);
// Step 5.1.2.2, transmit chunk over the network,
// currently implemented by sending the bytes to the fetch worker.
sink.transmit_bytes(bytes);

// Step 5.1.2.3
// Request the next chunk.
let _ = chunk_requester.send(BodyChunkRequest::Chunk);
}),
);

let body = match stream {
BodyStream::Chunked(receiver) => Body::wrap_stream(receiver),
BodyStream::Buffered(receiver) => {
// Accumulate bytes received over IPC into a vector.
let mut body = vec![];
loop {
match receiver.recv() {
Ok(BodyChunk::Chunk(mut bytes)) => {
body.append(&mut bytes);
},
Ok(BodyChunk::Done) => break,
Err(_) => warn!("Failed to read all chunks from request body."),
}
}
body.into()
},
};

HyperRequest::builder()
.method(method)
.uri(encoded_url)
.body(Body::wrap_stream(receiver))
.body(body)
} else {
HyperRequest::builder()
.method(method)
Expand Down Expand Up @@ -1566,16 +1680,26 @@ fn http_network_fetch(
// do not. Once we support other kinds of fetches we'll need to be more fine grained here
// since things like image fetches are classified differently by devtools
let is_xhr = request.destination == Destination::None;

// The receiver will receive true if there has been an error streaming the request body.
let (fetch_terminated_sender, fetch_terminated_receiver) = unbounded();

let response_future = obtain_response(
&context.state.client,
&url,
&request.method,
&request.headers,
request.body.as_mut().map(|body| body.take_stream()),
&mut request.headers,
request.body.as_ref().map(|body| body.take_stream()),
request
.body
.as_ref()
.map(|body| body.source_is_null())
.unwrap_or(false),
&request.pipeline_id,
request_id.as_ref().map(Deref::deref),
is_xhr,
context,
fetch_terminated_sender,
);

let pipeline_id = request.pipeline_id;
Expand All @@ -1585,6 +1709,22 @@ fn http_network_fetch(
Err(error) => return Response::network_error(error),
};

// Check if there was an error while streaming the request body.
//
// It's ok to block on the receiver,
// since we're already blocking on the response future above,
// so we can be sure that the request has already been processed,
// and a message is in the channel(or soon will be).
match fetch_terminated_receiver.recv() {
Ok(true) => {
return Response::network_error(NetworkError::Internal(
"Request body streaming failed.".into(),
));
},
Ok(false) => {},
Err(_) => warn!("Failed to receive confirmation request was streamed without error."),
}

if log_enabled!(log::Level::Info) {
info!("{:?} response for {}", res.version(), url);
for header in res.headers().iter() {
Expand Down
7 changes: 4 additions & 3 deletions components/net/tests/http_loader.rs
Expand Up @@ -33,8 +33,8 @@ use net::http_loader::determine_request_referrer;
use net::resource_thread::AuthCacheEntry;
use net::test::replace_host_table;
use net_traits::request::{
BodyChunkRequest, BodySource, CredentialsMode, Destination, RequestBody, RequestBuilder,
RequestMode,
BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, RequestBody,
RequestBuilder, RequestMode,
};
use net_traits::response::{HttpsState, ResponseBody};
use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
Expand Down Expand Up @@ -108,7 +108,8 @@ fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
Box::new(move |message| {
let request = message.to().unwrap();
if let BodyChunkRequest::Connect(sender) = request {
let _ = sender.send(content.clone());
let _ = sender.send(BodyChunkResponse::Chunk(content.clone()));
let _ = sender.send(BodyChunkResponse::Done);
}
}),
);
Expand Down
23 changes: 20 additions & 3 deletions components/net_traits/request.rs
Expand Up @@ -124,16 +124,33 @@ pub enum BodySource {
}

/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
/// which are sent from script to net.
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkResponse {
/// A chunk of bytes.
Chunk(Vec<u8>),
/// The body is done.
Done,
/// There was an error streaming the body,
/// terminate fetch.
Error,
}

/// Messages used to implement <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
/// which are sent from net to script
/// (with the exception of Done, which is sent from script to script).
#[derive(Debug, Deserialize, Serialize)]
pub enum BodyChunkRequest {
/// Connect a fetch in `net`, with a stream of bytes from `script`.
Connect(IpcSender<Vec<u8>>),
Connect(IpcSender<BodyChunkResponse>),
/// Re-extract a new stream from the source, following a redirect.
Extract(IpcReceiver<BodyChunkRequest>),
/// Ask for another chunk.
Chunk,
/// Signal the stream is done.
/// Signal the stream is done(sent from script to script).
Done,
/// Signal the stream has errored(sent from script to script).
Error,
}

/// The net component's view into <https://fetch.spec.whatwg.org/#bodies>
Expand Down Expand Up @@ -173,7 +190,7 @@ impl RequestBody {
}
}

pub fn take_stream(&mut self) -> IpcSender<BodyChunkRequest> {
pub fn take_stream(&self) -> IpcSender<BodyChunkRequest> {
self.chan.clone()
}

Expand Down

0 comments on commit 719b395

Please sign in to comment.