From 882cfb351b1a3919d2d81415709d601897578f41 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Tue, 26 May 2026 16:22:04 -0700 Subject: [PATCH] Stop flushing streamed output on every chunk --- src/http/mod.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/src/http/mod.rs b/src/http/mod.rs index 359f532..cb5b0fc 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1314,7 +1314,7 @@ async fn copy_async_reader_to_writer( where W: AsyncWrite + Unpin, { - let mut buf = vec![0; 16 * 1024]; + let mut buf = vec![0; 64 * 1024]; let mut written = 0i64; loop { let n = reader.read(&mut buf).await?; @@ -1326,7 +1326,6 @@ where capture.push(&buf[..n]); } writer.write_all(&buf[..n]).await?; - writer.flush().await?; written = written.saturating_add(i64::try_from(n).unwrap_or(i64::MAX)); } } @@ -1346,7 +1345,6 @@ where capture.push(prefix); } writer.write_all(prefix).await?; - writer.flush().await?; written = i64::try_from(prefix.len()).unwrap_or(i64::MAX); } Ok(written.saturating_add(copy_async_reader_to_writer(reader, writer, capture).await?)) @@ -3210,6 +3208,37 @@ mod tests { }; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::pin::Pin; + use std::task::{Context, Poll}; + + #[derive(Default)] + struct RecordingAsyncWriter { + bytes: Vec, + flushes: usize, + } + + impl AsyncWrite for RecordingAsyncWriter { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.bytes.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + self.flushes += 1; + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } #[test] fn default_scheme_loopback_is_http() { @@ -3580,6 +3609,40 @@ mod tests { )); } + #[tokio::test] + async fn async_copy_flushes_once_after_streaming_body() { + let input = vec![b'a'; (64 * 1024) + 17]; + let mut reader: AsyncReadBox = Box::pin(std::io::Cursor::new(input.clone())); + let mut writer = RecordingAsyncWriter::default(); + + let written = copy_async_reader_to_writer(&mut reader, &mut writer, None) + .await + .unwrap(); + + assert_eq!(written, i64::try_from(input.len()).unwrap()); + assert_eq!(writer.bytes, input); + assert_eq!(writer.flushes, 1); + } + + #[tokio::test] + async fn async_copy_with_prefix_flushes_once_after_streaming_body() { + let prefix = b"first chunk"; + let body = vec![b'b'; (64 * 1024) + 17]; + let mut reader: AsyncReadBox = Box::pin(std::io::Cursor::new(body.clone())); + let mut writer = RecordingAsyncWriter::default(); + + let written = + copy_async_reader_to_writer_with_prefix(&mut reader, &mut writer, prefix, None) + .await + .unwrap(); + + let mut expected = prefix.to_vec(); + expected.extend_from_slice(&body); + assert_eq!(written, i64::try_from(expected.len()).unwrap()); + assert_eq!(writer.bytes, expected); + assert_eq!(writer.flushes, 1); + } + #[test] fn formatted_sse_uses_dedicated_streaming_path() { let mut headers = HeaderMap::new();