diff --git a/AGENTS.md b/AGENTS.md index 73cb03e..8a3337b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -106,6 +106,8 @@ metadata/update/DNS/TLS inspection modes, and executes requests via `src/http`. - Output-file downloads keep `*.download` temp files behind a drop guard so cancellation paths such as Ctrl-C clean up partial files; Unix atomic installs also sync the parent directory after rename/link updates for stronger crash durability. - Image rendering defaults (`auto`) use built-in Rust decoders only; external adapters (`vips`, `magick`, `ffmpeg`) require `--image external` or `image = external` and run with bounded stdin/stdout/stderr and timeout handling. - Response compression negotiation is controlled by `--compress auto|br|gzip|zstd|off` or `compress = ...`; `brotli` is accepted as an alias for `br`, `auto` requests and decodes gzip/brotli/zstd, single-algorithm modes only request/decode that algorithm, and `off` leaves compressed bodies untouched. +- Formatted SSE responses stream incrementally to stdout with terminal color when enabled, rendering events as `event:`/`data:` blocks while formatting JSON data. Auto-compressed SSE responses are retried without `Accept-Encoding` so intermediaries do not buffer events; request timeouts from flags, curl commands, or config remain enforced. +- Digest authentication retries drain oversized 401 challenge bodies with a fixed bound; on Windows, the authenticated retry is sent before the partially drained challenge response is dropped so the local TCP abort from abandoning the first response cannot poison the follow-up request. - `--sort-headers` or `sort-headers = true` sorts displayed request/response headers alphabetically by name in verbose output without changing the actual request header order. - Default HTTP requests send `Accept: application/json, */*;q=0.5`, preferring JSON while allowing any other response type as a lower-priority fallback. - `--basic` and `--digest` credentials preserve exact bytes around the first colon; leading/trailing spaces in usernames or passwords are significant and are not trimmed after CLI or `--from-curl` parsing. diff --git a/docs/advanced-features.md b/docs/advanced-features.md index ef9bdff..0bf0e95 100644 --- a/docs/advanced-features.md +++ b/docs/advanced-features.md @@ -304,6 +304,10 @@ Compression modes: - `zstd` requests and decompresses zstd only - `off` sends no automatic `Accept-Encoding` header and leaves compressed response bodies untouched +For SSE (`text/event-stream`) responses in `auto` mode, `fetch` retries without +`Accept-Encoding` when the server replies with compressed content. This avoids +common buffering behavior that prevents events from appearing as they arrive. + Using `off` is useful when: - Testing compression behavior @@ -385,6 +389,11 @@ The timeout covers: - Connection establishment - TLS handshake - Request/response transfer +- Streamed response bodies such as SSE, NDJSON, and gRPC streams + +Timeouts from CLI flags, `--from-curl`, and configuration files are enforced for +streaming responses. Omit `--timeout` or use a larger value for long-lived event +streams. ### `--connect-timeout SECONDS` diff --git a/docs/cli-reference.md b/docs/cli-reference.md index db359c5..c3755cd 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -307,7 +307,9 @@ fetch --connect-timeout 5 --timeout 30 example.com ### `-t, --timeout SECONDS` -Request timeout in seconds. Accepts decimal values. +Request timeout in seconds. Accepts decimal values. The timeout covers the full +request, including response body streaming; it is also enforced for SSE, NDJSON, +and gRPC streams. ```sh fetch --timeout 30 example.com @@ -465,6 +467,10 @@ Control response compression negotiation. Values: `auto`, `br`/`brotli`, `gzip`, - `zstd` - request zstd compression only - `off` - disable automatic compression negotiation and decompression +In `auto` mode, compressed SSE (`text/event-stream`) responses are retried +without `Accept-Encoding` so streaming events can be delivered promptly instead +of being buffered by compression. + ```sh fetch --compress br example.com fetch --compress gzip example.com diff --git a/docs/configuration.md b/docs/configuration.md index 4d19338..a065c5c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -293,7 +293,9 @@ connect-timeout = 2.5 **Type**: Number (seconds) **Default**: None (no timeout) -Set a timeout for HTTP requests. Accepts decimal values. +Set a timeout for HTTP requests. Accepts decimal values. This timeout covers the +full request, including streamed response bodies such as SSE, NDJSON, and gRPC +streams. ```ini # 30 second timeout @@ -491,6 +493,9 @@ compress = zstd compress = off ``` +When `compress = auto`, compressed SSE (`text/event-stream`) responses are +retried without `Accept-Encoding` so events can be displayed as they arrive. + ### Session Options #### `session` diff --git a/docs/getting-started.md b/docs/getting-started.md index 75f3ea2..a7854e6 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -109,7 +109,7 @@ fetch httpbin.org/json | jq '.slideshow.title' - **YAML** - Syntax highlighted - **Images** - Rendered directly in supported terminals - **Protobuf / msgpack** - Decoded and displayed as JSON -- **SSE / NDJSON** - Streamed line-by-line +- **SSE / NDJSON** - Streamed as events or lines arrive See [Output Formatting](output-formatting.md) for details. diff --git a/docs/output-formatting.md b/docs/output-formatting.md index 71bd83d..16dff01 100644 --- a/docs/output-formatting.md +++ b/docs/output-formatting.md @@ -236,7 +236,9 @@ See [gRPC documentation](grpc.md) for schema-aware formatting. Features: - Streaming output as events arrive -- Event type and data parsing +- SSE-shaped `event:` and `data:` output +- JSON `data:` payloads are formatted and syntax-highlighted +- Request timeouts still apply to long-running event streams ```sh fetch example.com/events @@ -246,12 +248,17 @@ Output: ``` event: message -data: {"user": "john", "text": "Hello!"} +data: { "text": "Hello!", "user": "john" } event: message -data: {"user": "jane", "text": "Hi there!"} +data: { "text": "Hi there!", "user": "jane" } ``` +When color is enabled, `event` and `data` labels are highlighted, and JSON values +inside `data:` use the same syntax highlighting as JSON responses. In automatic +compression mode, compressed SSE responses are retried without `Accept-Encoding` +so proxies and servers are less likely to buffer events before delivery. + ### NDJSON / JSON Lines **Content-Types**: `application/x-ndjson`, `application/ndjson`, `application/x-jsonl`, `application/jsonl`, `application/x-jsonlines` diff --git a/src/format/sse.rs b/src/format/sse.rs index 8b65cda..35011dc 100644 --- a/src/format/sse.rs +++ b/src/format/sse.rs @@ -1,35 +1,137 @@ use serde_json::Value; -#[derive(Debug, Clone, PartialEq, Eq)] -struct Event { +use crate::core::{Sequence, write_styled_to_string}; + +pub fn format_event_stream(bytes: &[u8], color: bool) -> Result { + let input = std::str::from_utf8(bytes)?; + let mut formatter = EventStreamFormatter::new(color); + let mut out = String::new(); + formatter.push_str(input, &mut out); + formatter.finish(&mut out); + Ok(out) +} + +#[derive(Debug, Default)] +pub struct EventStreamFormatter { event_type: String, data: String, + line: String, + seen_first_line: bool, + pending_cr: bool, + color: bool, } -pub fn format_event_stream(bytes: &[u8]) -> Result { - let input = std::str::from_utf8(bytes)?; - let mut out = String::new(); - for (idx, event) in stream_events(input).into_iter().enumerate() { - if idx > 0 { +impl EventStreamFormatter { + pub fn new(color: bool) -> Self { + Self { + color, + ..Self::default() + } + } + + pub fn push_str(&mut self, input: &str, out: &mut String) { + for ch in input.chars() { + if self.pending_cr { + self.pending_cr = false; + if ch == '\n' { + continue; + } + } + + match ch { + '\n' => self.process_line(out), + '\r' => { + self.process_line(out); + self.pending_cr = true; + } + _ => self.line.push(ch), + } + } + } + + pub fn finish(&mut self, out: &mut String) { + if !self.line.is_empty() { + self.process_line(out); + } + if !self.data.is_empty() { + self.dispatch_event(out); + } + } + + fn process_line(&mut self, out: &mut String) { + if !self.seen_first_line { + if self.line.starts_with('\u{feff}') { + self.line.drain(..'\u{feff}'.len_utf8()); + } + self.seen_first_line = true; + } + + if self.line.is_empty() { + self.dispatch_event(out); + return; + } + + let (name, value) = self.line.split_once(':').unwrap_or((&self.line, "")); + if !name.is_empty() { + let value = value.strip_prefix(' ').unwrap_or(value); + match name { + "event" => { + self.event_type.clear(); + self.event_type.push_str(value); + } + "data" => { + self.data.push_str(value); + self.data.push('\n'); + } + "id" => {} + _ => {} + } + } + self.line.clear(); + } + + fn dispatch_event(&mut self, out: &mut String) { + let event_data = self.data.trim_end_matches('\n'); + if event_data.is_empty() { + self.event_type.clear(); + self.data.clear(); + self.line.clear(); + return; + } + + let event_type = if self.event_type.is_empty() { + "message" + } else { + &self.event_type + }; + write_sse_field(out, "event", event_type, self.color); + out.push('\n'); + let formatted_data = format_event_data(event_data, self.color); + for line in formatted_data.split('\n') { + write_sse_field(out, "data", line, self.color); out.push('\n'); } - out.push('['); - out.push_str(&event.event_type); - out.push_str("]\n"); - out.push_str(&format_event_data(&event.data)); out.push('\n'); + self.event_type.clear(); + self.data.clear(); + self.line.clear(); } - Ok(out) } -fn format_event_data(data: &str) -> String { +fn write_sse_field(out: &mut String, name: &str, value: &str, color: bool) { + write_styled_to_string(out, name, &[Sequence::Bold, Sequence::Cyan], color); + out.push_str(": "); + out.push_str(value); +} + +fn format_event_data(data: &str, color: bool) -> String { match serde_json::from_str::(data) { - Ok(value) => format_json_inline(&value), + Ok(value) => format_json_inline(&value, color), Err(_) => data.to_string(), } } -fn format_json_inline(value: &Value) -> String { +fn format_json_inline(value: &Value, color: bool) -> String { match value { Value::Array(values) => { if values.is_empty() { @@ -37,7 +139,7 @@ fn format_json_inline(value: &Value) -> String { } let values = values .iter() - .map(format_json_inline) + .map(|value| format_json_inline(value, color)) .collect::>() .join(", "); format!("[ {values} ]") @@ -49,121 +151,27 @@ fn format_json_inline(value: &Value) -> String { let values = map .iter() .map(|(key, value)| { - format!( - "{}: {}", - serde_json::to_string(key).expect("string key serializes"), - format_json_inline(value) - ) + let key = serde_json::to_string(key).expect("string key serializes"); + let mut styled_key = String::new(); + write_styled_to_string( + &mut styled_key, + &key, + &[Sequence::Blue, Sequence::Bold], + color, + ); + format!("{}: {}", styled_key, format_json_inline(value, color)) }) .collect::>() .join(", "); format!("{{ {values} }}") } - _ => serde_json::to_string(value).expect("JSON value serializes"), - } -} - -fn stream_events(input: &str) -> Vec { - let mut events = Vec::new(); - let mut event_type = String::new(); - let mut data = String::new(); - let mut seen_first_line = false; - - for mut line in Lines::new(input) { - if !seen_first_line { - line = line.strip_prefix('\u{feff}').unwrap_or(line); - seen_first_line = true; + Value::String(_) => { + let value = serde_json::to_string(value).expect("JSON value serializes"); + let mut out = String::new(); + write_styled_to_string(&mut out, &value, &[Sequence::Green], color); + out } - - if line.is_empty() { - dispatch_event(&mut events, &mut event_type, &mut data); - continue; - } - - let (name, value) = line.split_once(':').unwrap_or((line, "")); - if name.is_empty() { - continue; - } - let value = value.strip_prefix(' ').unwrap_or(value); - match name { - "event" => event_type = value.to_string(), - "data" => { - data.push_str(value); - data.push('\n'); - } - "id" => {} - _ => {} - } - } - - if !data.is_empty() { - dispatch_event(&mut events, &mut event_type, &mut data); - } - - events -} - -fn dispatch_event(events: &mut Vec, event_type: &mut String, data: &mut String) { - let event_data = data.trim_end_matches('\n').to_string(); - let event_name = std::mem::take(event_type); - data.clear(); - - if event_data.is_empty() { - return; - } - - events.push(Event { - event_type: if event_name.is_empty() { - "message".to_string() - } else { - event_name - }, - data: event_data, - }); -} - -struct Lines<'a> { - input: &'a str, -} - -impl<'a> Lines<'a> { - fn new(input: &'a str) -> Self { - Self { input } - } -} - -impl<'a> Iterator for Lines<'a> { - type Item = &'a str; - - fn next(&mut self) -> Option { - if self.input.is_empty() { - return None; - } - - for (idx, ch) in self.input.char_indices() { - match ch { - '\n' => { - let line = if idx > 0 && self.input.as_bytes()[idx - 1] == b'\r' { - &self.input[..idx - 1] - } else { - &self.input[..idx] - }; - self.input = &self.input[idx + ch.len_utf8()..]; - return Some(line); - } - '\r' => { - let line = &self.input[..idx]; - let rest = &self.input[idx + ch.len_utf8()..]; - self.input = rest.strip_prefix('\n').unwrap_or(rest); - return Some(line); - } - _ => {} - } - } - - let line = self.input; - self.input = ""; - Some(line) + _ => serde_json::to_string(value).expect("JSON value serializes"), } } @@ -173,35 +181,27 @@ mod tests { #[test] fn test_stream_events_eof_dispatches_final_event_without_blank_line() { - let got = stream_events("data: final\n"); - let want = vec![Event { - event_type: "message".to_string(), - data: "final".to_string(), - }]; + let got = format_event_stream(b"data: final\n", false).unwrap(); - assert_eq!(got, want); + assert_eq!(got, "event: message\ndata: final\n\n"); } #[test] fn test_stream_events_eof_does_not_duplicate_final_event_with_blank_line() { - let got = stream_events("data: final\n\n"); - let want = vec![Event { - event_type: "message".to_string(), - data: "final".to_string(), - }]; + let got = format_event_stream(b"data: final\n\n", false).unwrap(); - assert_eq!(got, want); + assert_eq!(got, "event: message\ndata: final\n\n"); } #[test] fn formats_event_stream_like_go_integration() { let input = b":comment\n\ndata:{\"key\":\"val\"}\n\nevent:ev1\ndata: this is my data\n\n"; - let got = format_event_stream(input).unwrap(); + let got = format_event_stream(input, false).unwrap(); assert_eq!( got, - "[message]\n{ \"key\": \"val\" }\n\n[ev1]\nthis is my data\n" + "event: message\ndata: { \"key\": \"val\" }\n\nevent: ev1\ndata: this is my data\n\n" ); } @@ -209,8 +209,36 @@ mod tests { fn supports_crlf_and_bom() { let input = "\u{feff}event: greeting\r\ndata: hello\r\n\r\n"; - let got = format_event_stream(input.as_bytes()).unwrap(); + let got = format_event_stream(input.as_bytes(), false).unwrap(); + + assert_eq!(got, "event: greeting\ndata: hello\n\n"); + } + + #[test] + fn formatter_streams_events_across_chunks() { + let mut formatter = EventStreamFormatter::new(false); + let mut got = String::new(); + + formatter.push_str("data: {\"a\"", &mut got); + assert!(got.is_empty()); + formatter.push_str(":1}\r\n", &mut got); + assert!(got.is_empty()); + formatter.push_str("\nevent: done\ndata: two\n\n", &mut got); + formatter.finish(&mut got); + + assert_eq!( + got, + "event: message\ndata: { \"a\": 1 }\n\nevent: done\ndata: two\n\n" + ); + } + + #[test] + fn colors_sse_fields_and_json_data() { + let got = format_event_stream(b"event: ev1\ndata: {\"key\":\"val\"}\n\n", true).unwrap(); - assert_eq!(got, "[greeting]\nhello\n"); + assert_eq!( + got, + "\x1b[1m\x1b[36mevent\x1b[0m: ev1\n\x1b[1m\x1b[36mdata\x1b[0m: { \x1b[34m\x1b[1m\"key\"\x1b[0m: \x1b[32m\"val\"\x1b[0m }\n\n" + ); } } diff --git a/src/http/mod.rs b/src/http/mod.rs index 3b76e56..f0b3c78 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -155,7 +155,7 @@ pub async fn execute(cli: &Cli) -> Result { apply_headers(&mut headers, &cli.headers)?; } apply_ranges(&mut headers, &cli.ranges); - let compression = apply_accept_encoding(&mut headers, cli, &method); + let mut compression = apply_accept_encoding(&mut headers, cli, &method); let mut body = request_body(cli)?; apply_body_content_type(&mut headers, &body); if cli.edit { @@ -315,6 +315,14 @@ pub async fn execute(cli: &Cli) -> Result { ) .await?; let status = response.status(); + let retry_sse_uncompressed = + should_retry_sse_without_compression(&response, compression); + if retry_sse_uncompressed { + ensure_request_body_replayable(&request_body, "retry SSE without compression")?; + headers.remove(ACCEPT_ENCODING); + compression = CompressionMode::Off; + continue; + } if attempt < retry_count && should_retry_status(status) { ensure_request_body_replayable(&request_body, "retry")?; let delay = @@ -761,6 +769,29 @@ async fn finish_response( } else { let body_start = Instant::now(); let stdout_is_terminal = std::io::stdout().is_terminal(); + if should_stream_formatted_sse_stdout(cli, &response_headers, stdout_is_terminal) { + let use_color = core::color_enabled(cli.color.as_deref(), stdout_is_terminal); + let streamed = stream_response_to_formatted_sse_stdout( + response, + response_headers.clone(), + compression, + cli.copy, + use_color, + ) + .await?; + handle_optional_clipboard_outcome(cli, streamed.clipboard); + let body_duration = + body_duration_from_len(method_is_head, streamed.bytes_written, body_start); + print_timing(cli, response_timing, body_duration); + + let code = exit_code(status.as_u16(), cli.ignore_status); + return Ok(check_grpc_status( + cli, + &response_headers, + &streamed.trailers, + code, + )); + } if let Some(target) = stdout_stream_target(cli, &response_headers, stdout_is_terminal) { let streamed = stream_response_to_stdout( response, @@ -905,6 +936,25 @@ fn response_header_content_type(headers: &HeaderMap) -> ContentType { content_type::get_content_type(content_type).0 } +fn should_stream_formatted_sse_stdout( + cli: &Cli, + headers: &HeaderMap, + stdout_is_terminal: bool, +) -> bool { + response_header_content_type(headers) == ContentType::Sse + && format_enabled(cli.format.as_deref(), stdout_is_terminal) +} + +fn should_retry_sse_without_compression(response: &Response, compression: CompressionMode) -> bool { + compression == CompressionMode::Auto + && response_header_content_type(response.headers()) == ContentType::Sse + && content_encoding_decoders(response.headers(), compression).is_some_and(|decoders| { + decoders + .iter() + .any(|encoding| encoding.as_str() != "aws-chunked") + }) +} + async fn read_decoded_response_body_limited( response: Response, response_headers: HeaderMap, @@ -1002,6 +1052,94 @@ async fn stream_response_to_stdout( }) } +async fn stream_response_to_formatted_sse_stdout( + response: Response, + response_headers: HeaderMap, + compression: CompressionMode, + copy: bool, + use_color: bool, +) -> Result { + let (reader, trailers) = async_response_reader(response); + let mut reader = decoded_async_response_reader(reader, compression, &response_headers)?; + let mut stdout = tokio::io::stdout(); + let mut capture = copy.then(clipboard::Capture::default); + let mut formatter = sse::EventStreamFormatter::new(use_color); + let mut pending = Vec::new(); + let mut buf = vec![0; 16 * 1024]; + let mut bytes_read = 0i64; + + loop { + let n = reader.read(&mut buf).await?; + if n == 0 { + let formatted = + finish_sse_stream_formatter(&mut pending, &mut formatter).map_err(|err| { + FetchError::Message(format!("invalid UTF-8 in event stream: {err}")) + })?; + if !formatted.is_empty() { + stdout.write_all(formatted.as_bytes()).await?; + } + stdout.flush().await?; + let clipboard = capture.map(clipboard::Capture::copy); + let trailers = trailers + .lock() + .map(|trailers| trailers.clone()) + .unwrap_or_default(); + return Ok(StreamedOutput { + trailers, + bytes_written: bytes_read, + clipboard, + }); + } + + if let Some(capture) = capture.as_mut() { + capture.push(&buf[..n]); + } + bytes_read = bytes_read.saturating_add(i64::try_from(n).unwrap_or(i64::MAX)); + pending.extend_from_slice(&buf[..n]); + let formatted = push_sse_stream_bytes(&mut pending, &mut formatter) + .map_err(|err| FetchError::Message(format!("invalid UTF-8 in event stream: {err}")))?; + if !formatted.is_empty() { + stdout.write_all(formatted.as_bytes()).await?; + stdout.flush().await?; + } + } +} + +fn push_sse_stream_bytes( + pending: &mut Vec, + formatter: &mut sse::EventStreamFormatter, +) -> Result { + let mut out = String::new(); + loop { + match std::str::from_utf8(pending) { + Ok(input) => { + formatter.push_str(input, &mut out); + pending.clear(); + return Ok(out); + } + Err(err) if err.error_len().is_none() => { + let valid_up_to = err.valid_up_to(); + if valid_up_to == 0 { + return Ok(out); + } + let input = std::str::from_utf8(&pending[..valid_up_to])?.to_string(); + formatter.push_str(&input, &mut out); + pending.drain(..valid_up_to); + } + Err(err) => return Err(err), + } + } +} + +fn finish_sse_stream_formatter( + pending: &mut Vec, + formatter: &mut sse::EventStreamFormatter, +) -> Result { + let mut out = push_sse_stream_bytes(pending, formatter)?; + formatter.finish(&mut out); + Ok(out) +} + async fn stream_response_to_output( response: Response, response_headers: HeaderMap, @@ -1071,7 +1209,9 @@ fn async_response_reader(response: Response) -> (AsyncReadBox, Arc, std::io::Error>(None); }; - let frame = frame.map_err(|err| std::io::Error::other(err.to_string()))?; + let frame = frame.map_err(|err| { + std::io::Error::other(reqwest_response_body_error_message(&err)) + })?; match frame.into_data() { Ok(data) => { if data.is_empty() { @@ -1113,6 +1253,7 @@ where capture.push(&buf[..n]); } writer.write_all(&buf[..n]).await?; + writer.flush().await?; written = written.saturating_add(i64::try_from(n).unwrap_or(i64::MAX)); } } @@ -1336,8 +1477,7 @@ async fn apply_digest_challenge( Err(_) => return Ok(response), }; - drain_response_body_bounded(response).await; - build_request( + let retry_request = build_request( context.client, challenged_method, challenged_url, @@ -1345,10 +1485,21 @@ async fn apply_digest_challenge( challenged_body, context.cli, RequestAuthorization::Digest(&auth), - )? - .send() - .await - .map_err(Into::into) + )?; + + #[cfg(windows)] + { + let retry_response: Result = + retry_request.send().await.map_err(Into::into); + drain_response_body_bounded(response).await; + retry_response + } + + #[cfg(not(windows))] + { + drain_response_body_bounded(response).await; + retry_request.send().await.map_err(Into::into) + } } fn digest_challenged_request( @@ -1824,7 +1975,7 @@ fn format_stdout_bytes_with_terminal( .map_err(|err| FetchError::Message(err.to_string())) } } - ContentType::Sse => sse::format_event_stream(&bytes) + ContentType::Sse => sse::format_event_stream(&bytes, use_color) .map(|formatted| formatted.into_bytes()) .map_err(|err| FetchError::Message(err.to_string())), _ => Ok(bytes.to_vec()), @@ -2104,6 +2255,32 @@ fn reqwest_request_error_message(err: &reqwest::Error) -> String { message } +fn reqwest_response_body_error_message(err: &reqwest::Error) -> String { + let mut details = Vec::new(); + let mut source = err.source(); + while let Some(err) = source { + let source_message = go_style_reqwest_source_message(&err.to_string()); + if !source_message.is_empty() + && source_message != "request or response body error" + && !details.contains(&source_message) + { + details.push(source_message); + } + source = err.source(); + } + + let reqwest_message = err.to_string(); + if details.is_empty() && reqwest_message != "request or response body error" { + details.push(reqwest_message); + } + + if details.is_empty() { + "response body error".to_string() + } else { + format!("response body error: {}", details.join(": ")) + } +} + fn go_style_reqwest_source_message(message: &str) -> String { let lower = message.to_ascii_lowercase(); if !lower.contains("tls") @@ -3223,6 +3400,22 @@ mod tests { )); } + #[test] + fn formatted_sse_uses_dedicated_streaming_path() { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream")); + + let cli = Cli::try_parse_from(["fetch", "https://example.com"]).unwrap(); + assert!(!should_stream_formatted_sse_stdout(&cli, &headers, false)); + assert!(should_stream_formatted_sse_stdout(&cli, &headers, true)); + + let cli = Cli::try_parse_from(["fetch", "--format", "on", "https://example.com"]).unwrap(); + assert!(should_stream_formatted_sse_stdout(&cli, &headers, false)); + + let cli = Cli::try_parse_from(["fetch", "--format", "off", "https://example.com"]).unwrap(); + assert!(!should_stream_formatted_sse_stdout(&cli, &headers, true)); + } + #[test] fn charset_decoder_matches_go_noop_and_known_charset_policy() { for charset in [ diff --git a/tests/integration.rs b/tests/integration.rs index 0572f19..0116d6b 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -343,7 +343,22 @@ impl PartialBodyReplayServer { let _ = stream.write_all(&body); let _ = stream.flush(); thread::spawn(move || { - thread::sleep(Duration::from_secs(5)); + let deadline = Instant::now() + Duration::from_secs(5); + let _ = stream.set_read_timeout(Some(Duration::from_millis(100))); + let mut buf = [0_u8; 1024]; + while Instant::now() < deadline { + match stream.read(&mut buf) { + Ok(0) => break, + Ok(_) => {} + Err(err) + if matches!( + err.kind(), + std::io::ErrorKind::WouldBlock + | std::io::ErrorKind::TimedOut + ) => {} + Err(_) => break, + } + } let _ = stream.shutdown(Shutdown::Both); }); } else { @@ -3035,6 +3050,8 @@ fn response_formatting_json_ndjson_xml_yaml_html_csv_css_and_sniffing() { .header("Content-Type", "text/html"), "/csv" => TestResponse::ok("name,value\none,1\n").header("Content-Type", "text/csv"), "/css" => TestResponse::ok("body{color:red}").header("Content-Type", "text/css"), + "/sse" => TestResponse::ok("data: {\"one\":1}\n\nevent: done\ndata: two\n\n") + .header("Content-Type", "text/event-stream"), "/sniff-json" => TestResponse::ok(r#"{"sniff":true}"#), "/sniff-xml" => TestResponse::ok(""), "/plain" => TestResponse::ok("just text"), @@ -3052,6 +3069,10 @@ fn response_formatting_json_ndjson_xml_yaml_html_csv_css_and_sniffing() { ), ("/csv", "name value\none 1\n"), ("/css", "body {\n color: red;\n}\n"), + ( + "/sse", + "event: message\ndata: { \"one\": 1 }\n\nevent: done\ndata: two\n\n", + ), ("/sniff-json", "{\n \"sniff\": true\n}\n"), ("/sniff-xml", "\n"), ("/plain", "just text"), @@ -3064,6 +3085,180 @@ fn response_formatting_json_ndjson_xml_yaml_html_csv_css_and_sniffing() { } } +#[cfg(unix)] +#[test] +fn formatted_sse_outputs_events_before_stream_ends() { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind streaming sse server"); + let url = format!("http://{}", listener.local_addr().expect("local addr")); + let (close_tx, close_rx) = mpsc::channel(); + let join = thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("accept streaming sse request"); + let reader_stream = stream.try_clone().expect("clone streaming sse stream"); + let mut reader = BufReader::new(reader_stream); + let _ = read_request(&mut reader).expect("read streaming sse request"); + + write!( + stream, + "HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n" + ) + .unwrap(); + let first = b"data: {\"one\":1}\n\n"; + write!(stream, "{:x}\r\n", first.len()).unwrap(); + stream.write_all(first).unwrap(); + stream.write_all(b"\r\n").unwrap(); + stream.flush().unwrap(); + + let _ = close_rx.recv_timeout(Duration::from_secs(5)); + let second = b"event: done\ndata: two\n\n"; + write!(stream, "{:x}\r\n", second.len()).unwrap(); + stream.write_all(second).unwrap(); + stream.write_all(b"\r\n0\r\n\r\n").unwrap(); + let _ = stream.flush(); + let _ = stream.shutdown(Shutdown::Both); + }); + + let pty = open_pty(24, 80, 800, 480); + let mut cmd = Command::new(fetch_bin()); + cmd.args([url.as_str(), "--format", "on", "--pager", "off"]); + cmd.env("TERM", "xterm-256color"); + cmd.env("HTTP_PROXY", ""); + cmd.env("HTTPS_PROXY", ""); + cmd.env("ALL_PROXY", ""); + cmd.env("NO_PROXY", "*"); + configure_pty_child(&mut cmd, &pty.slave); + let mut child = cmd.spawn().expect("spawn streaming sse fetch under PTY"); + drop(pty.slave); + let capture = start_pty_capture(&pty.master); + + capture.wait_for("\"one\"", Duration::from_secs(5)); + assert!( + wait_child(&mut child, Duration::from_millis(100)).is_none(), + "fetch exited before the SSE stream closed; PTY output:\n{}", + capture.output() + ); + close_tx.send(()).unwrap(); + + let status = wait_child(&mut child, Duration::from_secs(5)) + .unwrap_or_else(|| { + let _ = child.kill(); + panic!( + "fetch did not exit after SSE stream closed; PTY output:\n{}", + capture.output() + ) + }) + .expect("wait streaming sse fetch"); + assert!( + status.success(), + "fetch exited with {status}; PTY output:\n{}", + capture.output() + ); + let output = capture.output(); + assert!(output.contains("\x1b[1m\x1b[36mevent\x1b[0m: done")); + assert!(output.contains("two")); + drop(pty.master); + capture.close(); + join.join().unwrap(); +} + +#[test] +fn sse_explicit_request_timeout_aborts_stream_body() { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind explicit sse timeout server"); + let url = format!("http://{}", listener.local_addr().expect("local addr")); + let join = thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("accept explicit sse timeout"); + let reader_stream = stream + .try_clone() + .expect("clone explicit sse timeout stream"); + let mut reader = BufReader::new(reader_stream); + let _ = read_request(&mut reader).expect("read explicit sse timeout request"); + write!( + stream, + "HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n" + ) + .unwrap(); + stream.flush().unwrap(); + thread::sleep(Duration::from_millis(500)); + let _ = stream.shutdown(Shutdown::Both); + }); + + let res = run_fetch(&[&url, "--format", "on", "--timeout", "0.1"]); + + join.join().unwrap(); + assert_exit(&res, 1); + assert!(res.stdout.is_empty(), "stdout:\n{}", res.stdout); + assert!( + res.stderr + .contains("response body error: operation timed out"), + "stderr:\n{}", + res.stderr + ); +} + +#[test] +fn sse_config_request_timeout_aborts_stream_body() { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind config sse timeout server"); + let url = format!("http://{}", listener.local_addr().expect("local addr")); + let join = thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("accept config sse timeout"); + let reader_stream = stream.try_clone().expect("clone config sse timeout stream"); + let mut reader = BufReader::new(reader_stream); + let _ = read_request(&mut reader).expect("read config sse timeout request"); + write!( + stream, + "HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n" + ) + .unwrap(); + stream.flush().unwrap(); + thread::sleep(Duration::from_millis(500)); + let _ = stream.shutdown(Shutdown::Both); + }); + + let dir = TempDir::new().unwrap(); + let config = dir.path().join("config"); + fs::write(&config, "timeout = 0.1\n").unwrap(); + let res = run_fetch(&["--config", config.to_str().unwrap(), &url, "--format", "on"]); + + join.join().unwrap(); + assert_exit(&res, 1); + assert!(res.stdout.is_empty(), "stdout:\n{}", res.stdout); + assert!( + res.stderr + .contains("response body error: operation timed out"), + "stderr:\n{}", + res.stderr + ); +} + +#[test] +fn response_body_errors_include_response_context() { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind partial response server"); + let url = format!("http://{}", listener.local_addr().expect("local addr")); + let join = thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("accept partial response request"); + let reader_stream = stream.try_clone().expect("clone partial response stream"); + let mut reader = BufReader::new(reader_stream); + let _ = read_request(&mut reader).expect("read partial response request"); + write!( + stream, + "HTTP/1.1 200 OK\r\nContent-Length: 64\r\nConnection: close\r\n\r\nshort" + ) + .unwrap(); + let _ = stream.flush(); + let _ = stream.shutdown(Shutdown::Both); + }); + + let res = run_fetch(&[&url]); + + join.join().unwrap(); + assert_exit(&res, 1); + assert!(res.stderr.contains("response body error"), "{}", res.stderr); + assert!( + !res.stderr.contains("request or response body error"), + "{}", + res.stderr + ); +} + #[test] fn output_file_modes_match_go_harness() { let server = TestServer::start(|req| match req.path.as_str() { @@ -3746,8 +3941,20 @@ fn additional_formatting_sse_charset_image_and_compression_cases() { assert_exit(&res, 0); assert_eq!( res.stdout, - "[message]\n{ \"key\": \"val\" }\n\n[ev1]\nthis is my data\n" + "event: message\ndata: { \"key\": \"val\" }\n\nevent: ev1\ndata: this is my data\n\n" ); + let res = run_fetch(&[ + &format!("{}/sse", server.url), + "--format", + "on", + "--color", + "on", + ]); + assert_exit(&res, 0); + assert!(res.stdout.contains("\x1b[1m\x1b[36mevent\x1b[0m: message")); + assert!(res.stdout.contains("\x1b[1m\x1b[36mdata\x1b[0m: {")); + assert!(res.stdout.contains("\x1b[34m\x1b[1m\"key\"\x1b[0m")); + assert!(res.stdout.contains("\x1b[32m\"val\"\x1b[0m")); let res = run_fetch(&[&format!("{}/charset-json", server.url), "--format", "on"]); assert_exit(&res, 0); @@ -3780,6 +3987,9 @@ fn additional_formatting_sse_charset_image_and_compression_cases() { let mut gzip = GzEncoder::new(Vec::new(), Compression::default()); gzip.write_all(b"this is the test data").unwrap(); let gzip_body = gzip.finish().unwrap(); + let mut gzip_sse = GzEncoder::new(Vec::new(), Compression::default()); + gzip_sse.write_all(b"data: compressed\n\n").unwrap(); + let gzip_sse_body = gzip_sse.finish().unwrap(); let mut huge_gzip = GzEncoder::new(Vec::new(), Compression::default()); huge_gzip .write_all(&vec![b' '; 16 * 1024 * 1024 + 1]) @@ -3801,11 +4011,17 @@ fn additional_formatting_sse_charset_image_and_compression_cases() { "gzip, br, zstd" if req.path == "/too-large" => { TestResponse::ok(huge_gzip_body.clone()).header("Content-Encoding", "gzip") } + "gzip, br, zstd" if req.path == "/sse" => TestResponse::ok(gzip_sse_body.clone()) + .header("Content-Type", "text/event-stream") + .header("Content-Encoding", "gzip"), "gzip, br, zstd" | "gzip" => { TestResponse::ok(gzip_body.clone()).header("Content-Encoding", "gzip") } "br" => TestResponse::ok(brotli_body.clone()).header("Content-Encoding", "br"), "zstd" => TestResponse::ok(zstd_body.clone()).header("Content-Encoding", "zstd"), + "" if req.path == "/sse" => { + TestResponse::ok("data: uncompressed\n\n").header("Content-Type", "text/event-stream") + } "" => TestResponse::ok("this is the test data"), other => TestResponse::status( 400, @@ -3817,6 +4033,12 @@ fn additional_formatting_sse_charset_image_and_compression_cases() { assert_exit(&res, 0); assert_eq!(res.stdout, "this is the test data"); assert!(res.stderr.contains("gzip")); + let res = run_fetch(&[&format!("{}/sse", compressed.url), "--format", "on"]); + assert_exit(&res, 0); + assert_eq!(res.stdout, "event: message\ndata: uncompressed\n\n"); + let requests = wait_for_requests(&compressed, 3); + assert_eq!(requests[1].header("accept-encoding"), "gzip, br, zstd"); + assert_eq!(requests[2].header("accept-encoding"), ""); let res = run_fetch(&[&compressed.url, "--discard"]); assert_exit(&res, 0); assert!(res.stdout.is_empty());