Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ metadata/update/DNS/TLS inspection modes, and executes requests via `src/http`.
- Image rendering defaults (`auto`) use built-in Rust decoders only; external adapters (`vips`, `magick`, `ffmpeg`) require `--image external` or `image = external` and run with bounded stdin/stdout/stderr and timeout handling.
- Response compression negotiation is controlled by `--compress auto|br|gzip|zstd|off` or `compress = ...`; `brotli` is accepted as an alias for `br`, `auto` requests and decodes gzip/brotli/zstd, single-algorithm modes only request/decode that algorithm, and `off` leaves compressed bodies untouched.
- Formatted SSE responses stream incrementally to stdout with terminal color when enabled, rendering events as `event:`/`data:` blocks while formatting JSON data. Auto-compressed SSE responses are retried without `Accept-Encoding` so intermediaries do not buffer events; request timeouts from flags, curl commands, or config remain enforced.
- Formatted NDJSON responses stream incrementally to stdout when formatting is enabled, splitting decoded bytes on newlines, formatting each record with the JSON-line formatter, and flushing after each record.
- Digest authentication retries drain oversized 401 challenge bodies with a fixed bound; on Windows, and for responses whose advertised body exceeds the discard bound, the authenticated retry is sent before the partially drained challenge response is dropped so the local TCP abort from abandoning the first response cannot poison the follow-up request.
- `--sort-headers` or `sort-headers = true` sorts displayed request/response headers alphabetically by name in verbose output without changing the actual request header order.
- Default HTTP requests send `Accept: application/json, */*;q=0.5`, preferring JSON while allowing any other response type as a lower-priority fallback.
Expand Down
122 changes: 122 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,29 @@ async fn finish_response(
code,
));
}
if should_stream_formatted_ndjson_stdout(cli, &response_headers, stdout_is_terminal) {
let use_color = core::color_enabled(cli.color.as_deref(), stdout_is_terminal);
let streamed = stream_response_to_formatted_ndjson_stdout(
response,
response_headers.clone(),
compression,
cli.copy,
use_color,
)
.await?;
handle_optional_clipboard_outcome(cli, streamed.clipboard);
let body_duration =
body_duration_from_len(method_is_head, streamed.bytes_written, body_start);
print_timing(cli, response_timing, body_duration);

let code = exit_code(status.as_u16(), cli.ignore_status);
return Ok(check_grpc_status(
cli,
&response_headers,
&streamed.trailers,
code,
));
}
if should_stream_formatted_grpc_stdout(cli, &response_headers, stdout_is_terminal) {
let streamed = stream_response_to_formatted_grpc_stdout(
response,
Expand Down Expand Up @@ -1019,6 +1042,15 @@ fn should_stream_formatted_sse_stdout(
&& format_enabled(cli.format.as_deref(), stdout_is_terminal)
}

fn should_stream_formatted_ndjson_stdout(
cli: &Cli,
headers: &HeaderMap,
stdout_is_terminal: bool,
) -> bool {
response_header_content_type(headers) == ContentType::Ndjson
&& format_enabled(cli.format.as_deref(), stdout_is_terminal)
}

fn should_stream_formatted_grpc_stdout(
cli: &Cli,
headers: &HeaderMap,
Expand Down Expand Up @@ -1191,6 +1223,75 @@ async fn stream_response_to_formatted_sse_stdout(
}
}

async fn stream_response_to_formatted_ndjson_stdout(
response: Response,
response_headers: HeaderMap,
compression: CompressionMode,
copy: bool,
use_color: bool,
) -> Result<StreamedOutput, FetchError> {
let (reader, trailers) = async_response_reader(response);
let mut reader = decoded_async_response_reader(reader, compression, &response_headers)?;
let mut stdout = tokio::io::stdout();
let mut capture = copy.then(clipboard::Capture::default);
let mut pending = Vec::new();
let mut buf = vec![0; 16 * 1024];
let mut bytes_read = 0i64;

loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
if !pending.is_empty() {
write_formatted_ndjson_record(&mut stdout, &pending, use_color, false).await?;
}
stdout.flush().await?;
let clipboard = capture.map(clipboard::Capture::copy);
let trailers = trailers
.lock()
.map(|trailers| trailers.clone())
.unwrap_or_default();
return Ok(StreamedOutput {
trailers,
bytes_written: bytes_read,
clipboard,
});
}

if let Some(capture) = capture.as_mut() {
capture.push(&buf[..n]);
}
bytes_read = bytes_read.saturating_add(i64::try_from(n).unwrap_or(i64::MAX));
pending.extend_from_slice(&buf[..n]);
while let Some(newline) = pending.iter().position(|byte| *byte == b'\n') {
let mut record = pending.drain(..=newline).collect::<Vec<_>>();
record.pop();
write_formatted_ndjson_record(&mut stdout, &record, use_color, true).await?;
stdout.flush().await?;
}
}
}

async fn write_formatted_ndjson_record(
stdout: &mut tokio::io::Stdout,
record: &[u8],
use_color: bool,
terminated: bool,
) -> Result<(), FetchError> {
if record.iter().all(u8::is_ascii_whitespace) {
return Ok(());
}
match json::format_json_line(record, use_color) {
Ok(formatted) => stdout.write_all(&formatted).await?,
Err(_) => {
stdout.write_all(record).await?;
if terminated {
stdout.write_all(b"\n").await?;
}
}
}
Ok(())
}

async fn stream_response_to_formatted_grpc_stdout(
response: Response,
response_headers: HeaderMap,
Expand Down Expand Up @@ -3867,6 +3968,27 @@ mod tests {
assert!(!should_stream_formatted_sse_stdout(&cli, &headers, true));
}

#[test]
fn formatted_ndjson_uses_dedicated_streaming_path() {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/x-ndjson"),
);

let cli = Cli::try_parse_from(["fetch", "https://example.com"]).unwrap();
assert!(!should_stream_formatted_ndjson_stdout(
&cli, &headers, false
));
assert!(should_stream_formatted_ndjson_stdout(&cli, &headers, true));

let cli = Cli::try_parse_from(["fetch", "--format", "on", "https://example.com"]).unwrap();
assert!(should_stream_formatted_ndjson_stdout(&cli, &headers, false));

let cli = Cli::try_parse_from(["fetch", "--format", "off", "https://example.com"]).unwrap();
assert!(!should_stream_formatted_ndjson_stdout(&cli, &headers, true));
}

#[test]
fn charset_decoder_matches_go_noop_and_known_charset_policy() {
for charset in [
Expand Down
84 changes: 84 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3818,6 +3818,90 @@ fn chunked_ndjson_formats_records_split_across_chunks() {
assert_eq!(res.stdout, "{ \"a\": 1 }\n{ \"b\": 2 }\n");
}

#[cfg(unix)]
#[test]
fn formatted_ndjson_outputs_records_before_stream_ends() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind streaming ndjson server");
let url = format!("http://{}", listener.local_addr().expect("local addr"));
let (close_tx, close_rx) = mpsc::channel();
let join = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept streaming ndjson request");
let reader_stream = stream.try_clone().expect("clone streaming ndjson stream");
let mut reader = BufReader::new(reader_stream);
let _ = read_request(&mut reader).expect("read streaming ndjson request");
write!(
stream,
"HTTP/1.1 200 OK\r\nContent-Type: application/x-ndjson\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n"
)
.unwrap();
let first = br#"{"event":"started","n":1}
"#;
write!(stream, "{:x}\r\n", first.len()).unwrap();
stream.write_all(first).unwrap();
stream.write_all(b"\r\n").unwrap();
stream.flush().unwrap();

let _ = close_rx.recv_timeout(Duration::from_secs(5));
let second = br#"{"event":"finished","n":2}
"#;
write!(stream, "{:x}\r\n", second.len()).unwrap();
stream.write_all(second).unwrap();
stream.write_all(b"\r\n0\r\n\r\n").unwrap();
let _ = stream.flush();
let _ = stream.shutdown(Shutdown::Both);
});

let pty = open_pty(24, 80, 800, 480);
let mut cmd = Command::new(fetch_bin());
cmd.args([
url.as_str(),
"--format",
"on",
"--color",
"off",
"--pager",
"off",
]);
cmd.env("TERM", "xterm-256color");
cmd.env("HTTP_PROXY", "");
cmd.env("HTTPS_PROXY", "");
cmd.env("ALL_PROXY", "");
cmd.env("NO_PROXY", "*");
configure_pty_child(&mut cmd, &pty.slave);
let mut child = cmd.spawn().expect("spawn streaming ndjson fetch under PTY");
drop(pty.slave);
let capture = start_pty_capture(&pty.master);

capture.wait_for(r#"{ "event": "started", "n": 1 }"#, Duration::from_secs(5));
assert!(
wait_child(&mut child, Duration::from_millis(100)).is_none(),
"fetch exited before the NDJSON stream closed; PTY output:\n{}",
capture.output()
);
close_tx.send(()).unwrap();

let status = wait_child(&mut child, Duration::from_secs(5))
.unwrap_or_else(|| {
let _ = child.kill();
panic!(
"fetch did not exit after NDJSON stream closed; PTY output:\n{}",
capture.output()
)
})
.expect("wait streaming ndjson fetch");
assert!(
status.success(),
"fetch exited with {status}; PTY output:\n{}",
capture.output()
);
let output = capture.output();
assert!(output.contains(r#"{ "event": "started", "n": 1 }"#));
assert!(output.contains(r#"{ "event": "finished", "n": 2 }"#));
drop(pty.master);
capture.close();
join.join().unwrap();
}

#[cfg(unix)]
#[test]
fn raw_ndjson_outputs_chunks_before_stream_ends() {
Expand Down
Loading