Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ metadata/update/DNS/TLS inspection modes, and executes requests via `src/http`.
- The top-level app future in `src/app.rs` is heap-pinned before the shutdown-signal `tokio::select!`; do not move it back to `tokio::pin!` on the main stack because the combined async request/WebSocket/inspection state can overflow Windows' smaller main-thread stack even for metadata commands.
- Rust response body paging is controlled by `--pager auto|on|off` or `pager = ...`; `auto` routes terminal stdout through `less -FIRX`, `on` forces the pager, and `off` disables it. Image responses and output-file writes bypass the pager.
- Timeout duration parsing, Go-style duration formatting, elapsed request budgets, connect/DNS timeout caps, and shared `request timed out after ...` errors live in `src/duration.rs`; HTTP, WebSocket, DNS inspection, and TLS inspection paths should reuse `TimeoutBudget` instead of recomputing remaining time locally.
- HTTP retries keep the original `--timeout` operation deadline across attempts, cap retry sleeps to the remaining budget, refresh per-request timeouts before each send, and use best-effort byte/time-bounded drains for retry cleanup responses.
- Custom/pre-resolved DNS observes timeout budgets before the reqwest client is built: `--connect-timeout` bounds DNS resolution when set, otherwise DNS uses the remaining `--timeout` budget, and DoH lookup clients receive the same budget.
- Custom/pre-resolved DNS is scoped to the request URL; manual redirects that change scheme, host, or port rebuild the reqwest client and resolve the redirect target so `--dns-server`, `-vvv`, and `--timing` stay aligned with the actual target.
- Custom/pre-resolved DNS runs A and AAAA lookups concurrently for both UDP and DoH, preserving any successful records when the other family fails.
Expand All @@ -121,7 +122,7 @@ metadata/update/DNS/TLS inspection modes, and executes requests via `src/http`.
- Response compression negotiation is controlled by `--compress auto|br|gzip|zstd|off` or `compress = ...`; `brotli` is accepted as an alias for `br`, `auto` requests and decodes gzip/brotli/zstd, single-algorithm modes only request/decode that algorithm, and `off` leaves compressed bodies untouched.
- Formatted SSE responses stream incrementally to stdout with terminal color when enabled, rendering events as `event:`/`data:` blocks while formatting JSON data. Auto-compressed SSE responses are retried without `Accept-Encoding` so intermediaries do not buffer events; request timeouts from flags, curl commands, or config remain enforced.
- Formatted NDJSON responses stream incrementally to stdout when formatting is enabled, splitting decoded bytes on newlines, formatting each record with the JSON-line formatter, and flushing after each record.
- Digest authentication retries drain oversized 401 challenge bodies with a fixed bound. On Windows, challenge responses that may be abandoned before EOF are retried before the first response is dropped so the local TCP abort from abandoning that response cannot poison the follow-up request. Unsupported or malformed Digest challenges surface an explicit diagnostic before the body replay check.
- Digest authentication retries use bounded cleanup for 401 challenge bodies. Challenge responses that may be abandoned before EOF are retried through a fresh client before the first response is dropped so a local TCP abort from abandoning that response cannot poison the follow-up request. Unsupported or malformed Digest challenges surface an explicit diagnostic before the body replay check.
- `--sort-headers` or `sort-headers = true` sorts displayed request/response headers alphabetically by name in verbose output without changing the actual request header order.
- Default HTTP requests send `Accept: application/json, */*;q=0.5`, preferring JSON while allowing any other response type as a lower-priority fallback.
- `--basic` and `--digest` credentials preserve exact bytes around the first colon; leading/trailing spaces in usernames or passwords are significant and are not trimmed after CLI or `--from-curl` parsing.
Expand Down
118 changes: 93 additions & 25 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::auth::aws_sigv4;
use crate::auth::digest;
use crate::cli::{Cli, CompressionMode, HttpVersion};
use crate::core;
use crate::duration::{duration_from_seconds, request_timeout_message};
use crate::duration::{TimeoutBudget, duration_from_seconds, request_timeout_message};
use crate::error::{FetchError, write_error_with_color, write_warning_with_color};
use crate::format::content_type::{self, ContentType};
use crate::format::css;
Expand Down Expand Up @@ -111,6 +111,7 @@ impl RequestBodyPayload {

const MAX_BUFFERED_RESPONSE_BYTES: usize = 16 * 1024 * 1024;
const MAX_DISCARDED_RESPONSE_BYTES: usize = 1024 * 1024;
const MAX_RESPONSE_DRAIN_DURATION: Duration = Duration::from_millis(250);
const BINARY_RESPONSE_WARNING: &str =
"the response body appears to be binary\n\nTo output to the terminal anyway, use '--output -'";

Expand Down Expand Up @@ -284,6 +285,7 @@ pub async fn execute(cli: &Cli) -> Result<i32, FetchError> {
RequestAuthorization::None
},
)?;
let req = apply_request_timeout(req, request_timeout, request_start)?;
timing.set_dns(
request_client
.dns_resolution
Expand Down Expand Up @@ -341,6 +343,7 @@ pub async fn execute(cli: &Cli) -> Result<i32, FetchError> {
response,
DigestRetryContext {
client: &request_client.client,
client_build: &client_build,
method: request_method,
headers: headers.clone(),
body: request_body.clone(),
Expand All @@ -363,27 +366,29 @@ pub async fn execute(cli: &Cli) -> Result<i32, FetchError> {
}
if attempt < retry_count && should_retry_status(status) {
ensure_request_body_replayable(&request_body, "retry")?;
let delay =
let requested_delay =
compute_delay(retry_delay, attempt, parse_retry_after(response.headers()));
drain_response_body_bounded(response).await;
let delay = retry_delay_within_timeout(
requested_delay,
request_timeout,
request_start,
)?;
print_retry(
cli,
attempt + 2,
total_attempts,
delay,
&retry_reason(status),
);
drain_response_body_bounded(response).await;
tokio::time::sleep(delay).await;
let retry_client_build = client::ClientBuildContext {
mode: client_build.mode,
ensure_retry_delay_completed(
requested_delay,
delay,
request_timeout,
connect_timeout,
request_start: Instant::now(),
session: session.as_ref(),
connect_timing: Some(&connect_timing),
};
initial_client =
client::build_client_for_url(cli, &url, &retry_client_build).await?;
request_start,
)?;
initial_client = client::build_client_for_url(cli, &url, &client_build).await?;
attempt += 1;
continue;
}
Expand All @@ -402,9 +407,21 @@ pub async fn execute(cli: &Cli) -> Result<i32, FetchError> {
}
if attempt < retry_count && is_retryable_error(&err) {
ensure_request_body_replayable(&request_body, "retry")?;
let delay = compute_delay(retry_delay, attempt, Duration::ZERO);
let requested_delay = compute_delay(retry_delay, attempt, Duration::ZERO);
let delay = retry_delay_within_timeout(
requested_delay,
request_timeout,
request_start,
)?;
print_retry(cli, attempt + 2, total_attempts, delay, &err.to_string());
tokio::time::sleep(delay).await;
ensure_retry_delay_completed(
requested_delay,
delay,
request_timeout,
request_start,
)?;
initial_client = client::build_client_for_url(cli, &url, &client_build).await?;
attempt += 1;
continue;
}
Expand Down Expand Up @@ -1096,12 +1113,16 @@ async fn drain_response_body_bounded(mut response: Response) {

async fn drain_response_body_bounded_mut(response: &mut Response) {
let mut discarded = 0usize;
let drain_deadline = tokio::time::Instant::now() + MAX_RESPONSE_DRAIN_DURATION;
while discarded < MAX_DISCARDED_RESPONSE_BYTES {
match response.chunk().await {
Ok(Some(chunk)) => {
if tokio::time::Instant::now() >= drain_deadline {
break;
}
match tokio::time::timeout_at(drain_deadline, response.chunk()).await {
Ok(Ok(Some(chunk))) => {
discarded = discarded.saturating_add(chunk.len());
}
Ok(None) | Err(_) => break,
Ok(Ok(None)) | Ok(Err(_)) | Err(_) => break,
}
}
}
Expand Down Expand Up @@ -1711,6 +1732,17 @@ fn build_request(
Ok(req)
}

fn apply_request_timeout(
mut req: RequestBuilder,
request_timeout: Option<Duration>,
request_start: Instant,
) -> Result<RequestBuilder, FetchError> {
if let Some(timeout) = TimeoutBudget::started_at(request_timeout, request_start).remaining()? {
req = req.timeout(timeout);
}
Ok(req)
}

enum RequestAuthorization<'a> {
Cli,
Digest(&'a str),
Expand Down Expand Up @@ -1752,6 +1784,7 @@ fn reqwest_request_version_for_cli(cli: &Cli) -> Result<Option<reqwest::Version>

struct DigestRetryContext<'a> {
client: &'a Client,
client_build: &'a client::ClientBuildContext<'a>,
method: Method,
headers: HeaderMap,
body: RequestBody,
Expand All @@ -1762,7 +1795,7 @@ struct DigestRetryContext<'a> {
}

async fn apply_digest_challenge(
mut response: Response,
response: Response,
context: DigestRetryContext<'_>,
credentials: Option<&(String, String)>,
) -> Result<Response, FetchError> {
Expand Down Expand Up @@ -1807,23 +1840,32 @@ async fn apply_digest_challenge(
strip_entity_headers_for_bodyless_redirect(&mut challenged_headers);
}

let retry_before_drain = digest_retry_before_drain(&response);
let retry_client;
let client = if retry_before_drain {
retry_client =
client::build_client_for_url(context.cli, &challenged_url, context.client_build)
.await?;
&retry_client.client
} else {
context.client
};
let retry_request = build_request(
context.client,
client,
challenged_method,
challenged_url,
challenged_headers,
challenged_body,
context.cli,
RequestAuthorization::Digest(&auth),
)?;
let retry_request = apply_request_timeout(
retry_request,
context.client_build.request_timeout,
context.client_build.request_start,
)?;

if response_body_exceeds_discard_bound(&response) {
drain_response_body_bounded_mut(&mut response).await;
let retry_response: Result<Response, FetchError> =
retry_request.send().await.map_err(Into::into);
drop(response);
retry_response
} else if digest_retry_before_drain(&response) {
if retry_before_drain {
let retry_response: Result<Response, FetchError> =
retry_request.send().await.map_err(Into::into);
drop(response);
Expand Down Expand Up @@ -2820,6 +2862,32 @@ fn compute_delay(initial_delay: Duration, attempt: usize, retry_after: Duration)
delay.max(retry_after)
}

fn retry_delay_within_timeout(
delay: Duration,
request_timeout: Option<Duration>,
request_start: Instant,
) -> Result<Duration, FetchError> {
let Some(remaining) = TimeoutBudget::started_at(request_timeout, request_start).remaining()?
else {
return Ok(delay);
};
Ok(delay.min(remaining))
}

fn ensure_retry_delay_completed(
requested_delay: Duration,
actual_delay: Duration,
request_timeout: Option<Duration>,
request_start: Instant,
) -> Result<(), FetchError> {
if actual_delay < requested_delay {
return Err(TimeoutBudget::started_at(request_timeout, request_start).timeout_error());
}
TimeoutBudget::started_at(request_timeout, request_start)
.remaining()
.map(|_| ())
}

fn print_retry(
cli: &Cli,
next_attempt: usize,
Expand Down
73 changes: 73 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3681,6 +3681,79 @@ fn retry_statuses_and_request_body_replay() {
assert_exit(&res, 0);
}

#[test]
fn retry_status_delay_obeys_request_timeout_budget() {
let attempts = Arc::new(AtomicUsize::new(0));
let attempts_for_handler = Arc::clone(&attempts);
let server = TestServer::start(move |_| {
if attempts_for_handler.fetch_add(1, Ordering::SeqCst) == 0 {
TestResponse::status(503, "Service Unavailable", "retry")
.header("Connection", "keep-alive")
} else {
TestResponse::ok("done").header("Connection", "keep-alive")
}
});

let start = Instant::now();
let res = run_fetch(&[
&server.url,
"--retry",
"1",
"--retry-delay",
"3",
"--timeout",
"0.25",
]);
let elapsed = start.elapsed();

assert_exit(&res, 1);
assert!(
res.stderr.contains("request timed out after 250ms"),
"stderr:\n{}",
res.stderr
);
assert!(
elapsed < Duration::from_millis(1500),
"retry delay was not capped; elapsed: {elapsed:?}\nstdout:\n{}\nstderr:\n{}",
res.stdout,
res.stderr
);
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}

#[test]
fn retry_transport_error_delay_obeys_request_timeout_budget() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind unused port");
let addr = listener.local_addr().expect("unused port local addr");
drop(listener);
let url = format!("http://{addr}");

let start = Instant::now();
let res = run_fetch(&[
&url,
"--retry",
"1",
"--retry-delay",
"3",
"--timeout",
"0.25",
]);
let elapsed = start.elapsed();

assert_exit(&res, 1);
assert!(
res.stderr.contains("request timed out after 250ms"),
"stderr:\n{}",
res.stderr
);
assert!(
elapsed < Duration::from_millis(1500),
"retry delay was not capped; elapsed: {elapsed:?}\nstdout:\n{}\nstderr:\n{}",
res.stdout,
res.stderr
);
}

#[test]
fn retry_status_rejects_stdin_body_replay() {
let server = TestServer::start(|_| {
Expand Down
Loading