Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,90 @@ where
}
}

fn start_status_grpc_h2c_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind status grpc server");
listener.set_nonblocking(true).unwrap();
let url = format!("http://{}", listener.local_addr().unwrap());
thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
loop {
let Ok((stream, _)) = listener.accept().await else {
break;
};
tokio::spawn(async move {
serve_status_grpc_h2_connection(stream).await;
});
}
});
});
url
}

async fn serve_status_grpc_h2_connection<T>(stream: T)
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let Ok(mut connection) = h2::server::handshake(stream).await else {
return;
};
while let Some(result) = connection.accept().await {
let Ok((request, respond)) = result else {
break;
};
tokio::spawn(async move {
handle_status_grpc_h2_request(request, respond).await;
});
}
}

async fn handle_status_grpc_h2_request(
request: http::Request<h2::RecvStream>,
mut respond: h2::server::SendResponse<bytes::Bytes>,
) {
let path = request.uri().path().to_string();
let mut body = request.into_body();
while let Some(chunk) = body.data().await {
if chunk.is_err() {
return;
}
}

let (payload, status, message) = match path.as_str() {
"/test.Stream/Events" => {
let mut frames = grpc_frame(&proto_field_string(1, "first"));
frames.extend(grpc_frame(&proto_field_string(1, "second")));
(frames, "0", "")
}
"/test.Status/Denied" => (Vec::new(), "7", "permission%20denied"),
_ => (Vec::new(), "12", "unimplemented"),
};

let Ok(response) = http::Response::builder()
.status(200)
.header("content-type", "application/grpc+proto")
.body(())
else {
return;
};
let Ok(mut send) = respond.send_response(response, false) else {
return;
};
if !payload.is_empty() && send.send_data(bytes::Bytes::from(payload), false).is_err() {
return;
}
let mut trailers = http::HeaderMap::new();
trailers.insert("grpc-status", http::HeaderValue::from_static(status));
if !message.is_empty() {
trailers.insert("grpc-message", http::HeaderValue::from_static(message));
}
let _ = send.send_trailers(trailers);
}

async fn handle_reflection_h2_request(
request: http::Request<h2::RecvStream>,
mut respond: h2::server::SendResponse<bytes::Bytes>,
Expand Down Expand Up @@ -1981,6 +2065,60 @@ fn start_ws_multi_echo_server(messages: usize) -> String {
url
}

fn start_ws_push_server(
validate: impl Fn(&TestRequest) -> Result<(), String> + Send + Sync + 'static,
) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind websocket push server");
listener.set_nonblocking(true).unwrap();
let url = format!("ws://{}", listener.local_addr().unwrap());
let validate = Arc::new(validate);
thread::spawn(move || {
loop {
match listener.accept() {
Ok((mut stream, _)) => {
let _ = stream.set_nonblocking(false);
let validate = Arc::clone(&validate);
thread::spawn(move || {
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
let mut reader = BufReader::new(stream.try_clone().unwrap());
let Some(req) = read_request(&mut reader) else {
return;
};
if let Err(err) = validate(&req) {
write_response(
&mut stream,
TestResponse::status(400, "Bad Request", err),
);
return;
}
let key = req.header("sec-websocket-key");
let mut sha = Sha1::new();
sha.update(key.as_bytes());
sha.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
let accept =
base64::engine::general_purpose::STANDARD.encode(sha.finalize());
let response = format!(
"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {accept}\r\n\r\n"
);
if stream.write_all(response.as_bytes()).is_err() {
return;
}
let _ = stream.write_all(&ws_text_frame(br#"{"hello":"websocket"}"#));
let _ = stream.write_all(&ws_binary_frame(b"\x00\x01\x02\x03"));
let _ = stream.write_all(&ws_text_frame(b"plain text"));
let _ = stream.write_all(&ws_close_frame(b"done"));
});
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(5));
}
Err(_) => break,
}
}
});
url
}

fn read_ws_text(reader: &mut impl Read) -> String {
let mut header = [0_u8; 2];
if reader.read_exact(&mut header).is_err() {
Expand Down Expand Up @@ -2028,6 +2166,19 @@ fn ws_text_frame(payload: &[u8]) -> Vec<u8> {
out
}

fn ws_binary_frame(payload: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(payload.len() + 4);
out.push(0x82);
if payload.len() < 126 {
out.push(payload.len() as u8);
} else {
out.push(126);
out.extend_from_slice(&(payload.len() as u16).to_be_bytes());
}
out.extend_from_slice(payload);
out
}

fn ws_close_frame(reason: &[u8]) -> Vec<u8> {
let mut payload = 1000_u16.to_be_bytes().to_vec();
payload.extend_from_slice(reason);
Expand Down Expand Up @@ -3267,6 +3418,117 @@ fn formatted_sse_outputs_events_before_stream_ends() {
join.join().unwrap();
}

#[test]
fn chunked_ndjson_formats_records_split_across_chunks() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind chunked ndjson server");
let url = format!("http://{}", listener.local_addr().expect("local addr"));
let join = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept chunked ndjson request");
let reader_stream = stream.try_clone().expect("clone chunked ndjson stream");
let mut reader = BufReader::new(reader_stream);
let _ = read_request(&mut reader).expect("read chunked ndjson request");
write!(
stream,
"HTTP/1.1 200 OK\r\nContent-Type: application/x-ndjson\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n"
)
.unwrap();
for chunk in [
b"{\"a\":".as_slice(),
b"1}\n{\"b\"".as_slice(),
b":2}\n".as_slice(),
] {
write!(stream, "{:x}\r\n", chunk.len()).unwrap();
stream.write_all(chunk).unwrap();
stream.write_all(b"\r\n").unwrap();
stream.flush().unwrap();
}
stream.write_all(b"0\r\n\r\n").unwrap();
let _ = stream.shutdown(Shutdown::Both);
});

let res = run_fetch(&[&url, "--format", "on"]);

join.join().unwrap();
assert_exit(&res, 0);
assert_eq!(res.stdout, "{ \"a\": 1 }\n{ \"b\": 2 }\n");
}

#[cfg(unix)]
#[test]
fn raw_ndjson_outputs_chunks_before_stream_ends() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind streaming ndjson server");
let url = format!("http://{}", listener.local_addr().expect("local addr"));
let (close_tx, close_rx) = mpsc::channel();
let join = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept streaming ndjson request");
let reader_stream = stream.try_clone().expect("clone streaming ndjson stream");
let mut reader = BufReader::new(reader_stream);
let _ = read_request(&mut reader).expect("read streaming ndjson request");
write!(
stream,
"HTTP/1.1 200 OK\r\nContent-Type: application/x-ndjson\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n"
)
.unwrap();
let first = br#"{"event":"started"}
"#;
write!(stream, "{:x}\r\n", first.len()).unwrap();
stream.write_all(first).unwrap();
stream.write_all(b"\r\n").unwrap();
stream.flush().unwrap();

let _ = close_rx.recv_timeout(Duration::from_secs(5));
let second = br#"{"event":"finished"}
"#;
write!(stream, "{:x}\r\n", second.len()).unwrap();
stream.write_all(second).unwrap();
stream.write_all(b"\r\n0\r\n\r\n").unwrap();
let _ = stream.flush();
let _ = stream.shutdown(Shutdown::Both);
});

let pty = open_pty(24, 80, 800, 480);
let mut cmd = Command::new(fetch_bin());
cmd.args([url.as_str(), "--format", "off", "--pager", "off"]);
cmd.env("TERM", "xterm-256color");
cmd.env("HTTP_PROXY", "");
cmd.env("HTTPS_PROXY", "");
cmd.env("ALL_PROXY", "");
cmd.env("NO_PROXY", "*");
configure_pty_child(&mut cmd, &pty.slave);
let mut child = cmd.spawn().expect("spawn streaming ndjson fetch under PTY");
drop(pty.slave);
let capture = start_pty_capture(&pty.master);

capture.wait_for("\"started\"", Duration::from_secs(5));
assert!(
wait_child(&mut child, Duration::from_millis(100)).is_none(),
"fetch exited before the NDJSON stream closed; PTY output:\n{}",
capture.output()
);
close_tx.send(()).unwrap();

let status = wait_child(&mut child, Duration::from_secs(5))
.unwrap_or_else(|| {
let _ = child.kill();
panic!(
"fetch did not exit after NDJSON stream closed; PTY output:\n{}",
capture.output()
)
})
.expect("wait streaming ndjson fetch");
assert!(
status.success(),
"fetch exited with {status}; PTY output:\n{}",
capture.output()
);
let output = capture.output();
assert!(output.contains("\"started\""));
assert!(output.contains("\"finished\""));
drop(pty.master);
capture.close();
join.join().unwrap();
}

#[test]
fn sse_explicit_request_timeout_aborts_stream_body() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind explicit sse timeout server");
Expand Down Expand Up @@ -5524,6 +5786,35 @@ fn websocket_noninteractive_go_cases() {
assert!(res.stderr.contains("GET") || res.stderr.contains("method"));
}

#[test]
fn websocket_receives_json_text_binary_and_query_handshake() {
let ws_url = start_ws_push_server(|req| {
if req.path != "/stream?topic=builds" {
return Err(format!("unexpected path: {}", req.path));
}
if req.header("x-test") != "yes" {
return Err("missing test header".to_string());
}
Ok(())
});
let res = run_fetch(&[
&format!("{ws_url}/stream"),
"-q",
"topic=builds",
"-H",
"X-Test: yes",
"--format",
"on",
"--ws-interactive",
"off",
]);

assert_exit(&res, 0);
assert!(res.stdout.contains("\"hello\": \"websocket\""));
assert!(res.stdout.contains("plain text"));
assert!(res.stderr.contains("[binary 4 bytes]"));
}

#[test]
fn websocket_wss_trusts_custom_ca_go_case() {
let (wss, seen) = start_wss_echo_server(|req| {
Expand Down Expand Up @@ -5846,6 +6137,31 @@ service StreamService {
assert!(res.stderr.contains("protoc") || res.stderr.contains("exist"));
}

#[test]
fn grpc_h2c_stream_frames_and_status_trailers_are_handled() {
let server_url = start_status_grpc_h2c_server();
let res = run_fetch(&[
&format!("{server_url}/test.Stream/Events"),
"--grpc",
"--format",
"on",
]);
assert_exit(&res, 0);
assert!(res.stdout.contains("first"), "stdout:\n{}", res.stdout);
assert!(res.stdout.contains("second"), "stdout:\n{}", res.stdout);

let res = run_fetch(&[
&format!("{server_url}/test.Status/Denied"),
"--grpc",
"--format",
"off",
]);
assert_exit(&res, 1);
assert!(res.stdout.is_empty(), "stdout:\n{}", res.stdout);
assert!(res.stderr.contains("PERMISSION_DENIED"), "{}", res.stderr);
assert!(res.stderr.contains("permission denied"), "{}", res.stderr);
}

#[test]
fn grpc_reflection_h2c_go_cases() {
let server = start_reflection_grpc_h2c_server(true);
Expand Down
Loading