diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 40d4ba41..0ed39ef6 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -1036,20 +1036,10 @@ where let mut out: Vec = Vec::new(); let mut tmp = [0u8; 16384]; loop { - while !buf.windows(2).any(|w| w == b"\r\n") { - let n = timeout(Duration::from_secs(20), stream.read(&mut tmp)).await - .map_err(|_| FronterError::Timeout)??; - if n == 0 { - return Ok(out); - } - buf.extend_from_slice(&tmp[..n]); - } - let idx = buf.windows(2).position(|w| w == b"\r\n").unwrap(); - let size_line_owned = std::str::from_utf8(&buf[..idx]) + let size_line_owned = std::str::from_utf8(&read_crlf_line(stream, &mut buf, &mut tmp).await?) .map_err(|_| FronterError::BadResponse("bad chunk size".into()))? .trim() .to_string(); - buf.drain(..idx + 2); if size_line_owned.is_empty() { continue; } @@ -1059,7 +1049,11 @@ where ) .map_err(|_| FronterError::BadResponse(format!("bad chunk size '{}'", size_line_owned)))?; if size == 0 { - break; + loop { + if read_crlf_line(stream, &mut buf, &mut tmp).await?.is_empty() { + return Ok(out); + } + } } while buf.len() < size + 2 { let n = timeout(Duration::from_secs(20), stream.read(&mut tmp)).await @@ -1073,7 +1067,31 @@ where out.extend_from_slice(&buf[..size]); buf.drain(..size + 2); } - Ok(out) +} + +async fn read_crlf_line( + stream: &mut S, + buf: &mut Vec, + tmp: &mut [u8], +) -> Result, FronterError> +where + S: tokio::io::AsyncRead + Unpin, +{ + loop { + if let Some(idx) = buf.windows(2).position(|w| w == b"\r\n") { + let line = buf[..idx].to_vec(); + buf.drain(..idx + 2); + return Ok(line); + } + let n = timeout(Duration::from_secs(20), stream.read(tmp)).await + .map_err(|_| FronterError::Timeout)??; + if n == 0 { + return Err(FronterError::BadResponse( + "connection closed mid-chunked response".into(), + )); + } + buf.extend_from_slice(&tmp[..n]); + } } fn decode_gzip(data: &[u8]) -> Result, std::io::Error> { @@ -1342,6 +1360,7 @@ impl ServerCertVerifier for NoVerify { #[cfg(test)] mod tests { use super::*; + use tokio::io::{duplex, AsyncWriteExt}; #[test] fn normalize_x_graphql_trims_after_variables() { @@ -1499,4 +1518,28 @@ mod tests { assert!(s.contains("Set-Cookie: a=1\r\n")); assert!(s.contains("Set-Cookie: b=2\r\n")); } + + #[tokio::test(flavor = "current_thread")] + async fn chunked_reader_consumes_final_crlf_and_trailers() { + let (mut client, mut server) = duplex(1024); + client + .write_all( + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nHello\r\n0\r\nX-Test: 1\r\n\r\n", + ) + .await + .unwrap(); + + let (status, _headers, body) = read_http_response(&mut server).await.unwrap(); + assert_eq!(status, 200); + assert_eq!(body, b"Hello"); + + client + .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") + .await + .unwrap(); + + let (status2, _headers2, body2) = read_http_response(&mut server).await.unwrap(); + assert_eq!(status2, 200); + assert_eq!(body2, b"OK"); + } }