From b3f84cde38e94aaf6972d15dd24b770b511ae9c2 Mon Sep 17 00:00:00 2001 From: Felician Nemeth Date: Wed, 8 May 2024 15:54:50 +0200 Subject: [PATCH] Modify example to demostrate bug-1850 Run as: cargo run --example server . cargo run --example client http://localhost:4433/Cargo.toml --- quinn/examples/client.rs | 41 +++++++++++++------------ quinn/examples/server.rs | 64 +++++++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/quinn/examples/client.rs b/quinn/examples/client.rs index c037833..d560b1f 100644 --- a/quinn/examples/client.rs +++ b/quinn/examples/client.rs @@ -1,6 +1,9 @@ //! This example demonstrates an HTTP client that requests files from a server. //! //! Checkout the `README.md` for guidance. +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] use std::{ fs, @@ -117,35 +120,31 @@ async fn run(options: Opt) -> Result<()> { .await .map_err(|e| anyhow!("failed to connect: {}", e))?; eprintln!("connected at {:?}", start.elapsed()); - let (mut send, mut recv) = conn - .open_bi() - .await - .map_err(|e| anyhow!("failed to open stream: {}", e))?; if rebind { let socket = std::net::UdpSocket::bind("[::]:0").unwrap(); let addr = socket.local_addr().unwrap(); eprintln!("rebinding to {addr}"); endpoint.rebind(socket).expect("rebind failed"); } - - send.write_all(request.as_bytes()) - .await - .map_err(|e| anyhow!("failed to send request: {}", e))?; - send.finish().unwrap(); let response_start = Instant::now(); eprintln!("request sent at {:?}", response_start - start); - let resp = recv - .read_to_end(usize::max_value()) - .await - .map_err(|e| anyhow!("failed to read response: {}", e))?; - let duration = response_start.elapsed(); - eprintln!( - "response received in {:?} - {} KiB/s", - duration, - resp.len() as f32 / (duration_secs(&duration) * 1024.0) - ); - io::stdout().write_all(&resp).unwrap(); - io::stdout().flush().unwrap(); + loop { + eprint!("inside the loop: "); + match conn.read_datagram().await { + Ok(bytes) => { + eprintln!("Read {} bytes", bytes.len()); + if bytes[0] == 0 { + break; + } + } + Err(e) => { + eprintln!("Error: {:?}", e); + return Ok(()); + } + } + } + + conn.close(0u32.into(), b"done"); // Give the server a fair chance to receive the close packet diff --git a/quinn/examples/server.rs b/quinn/examples/server.rs index 5f9e6f3..7c86ab8 100644 --- a/quinn/examples/server.rs +++ b/quinn/examples/server.rs @@ -1,6 +1,9 @@ //! This example demonstrates an HTTP server that serves files from a directory. //! //! Checkout the `README.md` for guidance. +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] use std::{ ascii, fs, io, @@ -17,6 +20,10 @@ use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use tracing::{error, info, info_span}; use tracing_futures::Instrument as _; +use std::thread; +use std::time; +use bytes::Bytes; + mod common; #[derive(Parser, Debug)] @@ -176,36 +183,39 @@ async fn handle_connection(root: Arc, conn: quinn::Incoming) -> Result<()> .protocol .map_or_else(|| "".into(), |x| String::from_utf8_lossy(&x).into_owned()) ); - async { - info!("established"); - - // Each stream initiated by the client constitutes a new request. - loop { - let stream = connection.accept_bi().await; - let stream = match stream { - Err(quinn::ConnectionError::ApplicationClosed { .. }) => { - info!("connection closed"); - return Ok(()); - } - Err(e) => { - return Err(e); + eprintln!("handle_connection"); + let connection = Arc::new(connection); + let conn = connection.clone(); + tokio::spawn( + async move { + for _ in 0..100 { + let free_len = conn.max_datagram_size().unwrap_or(0); + let data = vec![1; free_len]; + if let Err(e) = conn.send_datagram(Bytes::copy_from_slice(&data)) { + eprintln!("ERROR: {e}"); + break; + } else { + eprintln!("Sent {} bytes", data.len()); } - Ok(s) => s, - }; - let fut = handle_request(root.clone(), stream); - tokio::spawn( - async move { - if let Err(e) = fut.await { - error!("failed: {reason}", reason = e.to_string()); - } - } - .instrument(info_span!("request")), - ); + thread::sleep(time::Duration::from_millis(100)); + } + let data = vec![0]; + let _ = conn.send_datagram(Bytes::copy_from_slice(&data)); } + ); + + // Keep `connection` until ApplicationClosed arrives. + let stream = connection.accept_bi().await; + match stream { + Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + info!("connection closed"); + return Ok(()); + } + Err(e) => { + return Err(e.into()); + } + Ok(s) => Ok(()), } - .instrument(span) - .await?; - Ok(()) } async fn handle_request( -- 2.39.2