Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 57 additions & 29 deletions interop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ struct Opt {
h3: bool,
#[structopt(long)]
hq: bool,
#[structopt(long)]
seq: bool,
/// Enable key logging
#[structopt(long = "keylog")]
keylog: bool,
Expand Down Expand Up @@ -57,6 +59,7 @@ struct Peer {
port: u16,
retry_port: u16,
alpn: Alpn,
sequential: bool,
}

impl Peer {
Expand All @@ -68,6 +71,7 @@ impl Peer {
port: 4433,
retry_port: 4434,
alpn: Alpn::HqH3,
sequential: false,
}
}

Expand Down Expand Up @@ -95,14 +99,23 @@ impl Peer {
self.alpn = Alpn::Hq;
self
}

fn uri(&self, path: &str) -> http::Uri {
http::Uri::builder()
.scheme("https")
.authority(self.host.as_str())
.path_and_query(path)
.build()
.expect("invalid uri")
}
}

lazy_static! {
static ref PEERS: Vec<Peer> = vec![
Peer::new("quant.eggert.org").name("quant").hq(),
Peer::new("nghttp2.org").name("nghttp2").h3(),
Peer::new("fb.mvfst.net").name("mvfst").h3(),
Peer::new("test.privateoctopus.com").name("picoquic"),
Peer::new("test.privateoctopus.com").name("picoquic").h3(),
Peer::new("quic.westus.cloudapp.azure.com")
.name("msquic")
.h3()
Expand Down Expand Up @@ -146,6 +159,7 @@ async fn main() {
host,
port: opt.port.unwrap_or(4433),
retry_port: opt.retry_port.unwrap_or(4434),
sequential: opt.seq,
alpn: match (opt.h3, opt.hq) {
(false, true) => Alpn::Hq,
(true, false) => Alpn::H3,
Expand All @@ -172,6 +186,7 @@ async fn main() {
if let Some(retry_port) = opt.retry_port {
x.retry_port = retry_port;
}
x.sequential = opt.seq;
});
peers
} else {
Expand Down Expand Up @@ -217,21 +232,41 @@ struct State {

impl State {
async fn run_hq(self) -> Result<InteropResult> {
let (core, key_update, rebind, retry) =
future::join4(self.core(), self.key_update(), self.rebind(), self.retry()).await;
Ok(build_result(core, key_update, rebind, retry, None))
if self.peer.sequential {
Ok(build_result(
self.core().await,
self.key_update().await,
self.rebind().await,
self.retry().await,
None,
))
} else {
let (core, key_update, rebind, retry) =
future::join4(self.core(), self.key_update(), self.rebind(), self.retry()).await;
Ok(build_result(core, key_update, rebind, retry, None))
}
}

async fn run_h3(self) -> Result<InteropResult> {
let (core, key_update, rebind, retry, h3) = future::join5(
self.core_h3(),
self.key_update_h3(),
self.rebind_h3(),
self.retry_h3(),
self.h3(),
)
.await;
Ok(build_result(core, key_update, rebind, retry, Some(h3)))
if self.peer.sequential {
Ok(build_result(
self.core_h3().await,
self.key_update_h3().await,
self.rebind_h3().await,
self.retry_h3().await,
Some(self.h3().await),
))
} else {
let (core, key_update, rebind, retry, h3) = future::join5(
self.core_h3(),
self.key_update_h3(),
self.rebind_h3(),
self.retry_h3(),
self.h3(),
)
.await;
Ok(build_result(core, key_update, rebind, retry, Some(h3)))
}
}

fn try_new(peer: &Peer, keylog: bool) -> Result<Self> {
Expand Down Expand Up @@ -442,7 +477,7 @@ impl State {
tokio::spawn(h3_driver.unwrap_or_else(|_| ()));
tokio::spawn(quic_driver.unwrap_or_else(|_| ()));

h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("h3 request failed: {}", e))?;
conn.close();
Expand All @@ -466,7 +501,7 @@ impl State {
}
});
result.handshake = true;
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("simple request failed: {}", e))?;
result.stream_data = true;
Expand All @@ -492,7 +527,7 @@ impl State {
Ok((quic_driver, driver, conn, _)) => {
tokio::spawn(quic_driver.unwrap_or_else(|_| ()));
tokio::spawn(driver.unwrap_or_else(|_| ()));
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("0-RTT request failed: {}", e))?;
result.zero_rtt = true;
Expand Down Expand Up @@ -526,11 +561,11 @@ impl State {
tokio::spawn(quic_driver.unwrap_or_else(|_| ()));
tokio::spawn(h3_driver.unwrap_or_else(|_| ()));
// Make sure some traffic has gone both ways before the key update
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("request failed before key update: {}", e))?;
conn.force_key_update();
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("request failed after key update: {}", e))?;
conn.close();
Expand All @@ -550,7 +585,7 @@ impl State {
.map_err(|e| anyhow!("h3 failed to connect: {}", e))?;
tokio::spawn(quic_driver.unwrap_or_else(|_| ()));
tokio::spawn(h3_driver.unwrap_or_else(|_| ()));
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("request failed on retry port: {}", e))?;
conn.close();
Expand All @@ -571,7 +606,7 @@ impl State {
tokio::spawn(h3_driver.unwrap_or_else(|_| ()));
let socket = std::net::UdpSocket::bind("[::]:0").unwrap();
endpoint.rebind(socket)?;
h3_get(&conn)
h3_get(&conn, &self.peer.uri("/"))
.await
.map_err(|e| anyhow!("request failed on retry port: {}", e))?;
conn.close();
Expand Down Expand Up @@ -657,15 +692,8 @@ fn build_result(
result
}

async fn h3_get(conn: &quinn_h3::client::Connection) -> Result<()> {
let (response, _) = conn
.send_request(
http::Request::builder()
.method(http::Method::GET)
.uri("/")
.body(())?,
)
.await?;
async fn h3_get(conn: &quinn_h3::client::Connection, uri: &http::Uri) -> Result<()> {
let (response, _) = conn.send_request(http::Request::get(uri).body(())?).await?;

let (_, mut recv_body) = response.await?;

Expand Down
17 changes: 7 additions & 10 deletions quinn-h3/examples/h3_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use structopt::{self, StructOpt};

use anyhow::{anyhow, Result};
use futures::AsyncReadExt;
use http::{method::Method, Request};
use http::{Request, Uri};
use tracing::{error, info};
use url::Url;

use quinn_h3::{
self,
Expand All @@ -16,7 +15,7 @@ use quinn_h3::{
#[structopt(name = "h3_client")]
struct Opt {
#[structopt(default_value = "http://localhost:4433/Cargo.toml")]
url: Url,
uri: Uri,

/// Custom certificate authority to trust, in DER format
#[structopt(parse(from_os_str), long = "ca")]
Expand Down Expand Up @@ -58,21 +57,21 @@ async fn main() -> Result<()> {
}
});

match request(client, &options.url).await {
match request(client, &options.uri).await {
Ok(_) => println!("client finished"),
Err(e) => println!("client failed: {:?}", e),
}

Ok(())
}

async fn request(client: Client, url: &Url) -> Result<()> {
let remote = (url.host_str().unwrap(), url.port().unwrap_or(4433))
async fn request(client: Client, uri: &Uri) -> Result<()> {
let remote = (uri.host().unwrap(), uri.port_u16().unwrap_or(4433))
.to_socket_addrs()?
.next()
.ok_or_else(|| anyhow!("couldn't resolve to an address"))?;
let (quic_driver, h3_driver, conn) = client
.connect(&remote, url.host_str().unwrap_or("localhost"))?
.connect(&remote, uri.host().unwrap_or("localhost"))?
.await
.map_err(|e| anyhow!("failed ot connect: {:?}", e))?;

Expand All @@ -88,9 +87,7 @@ async fn request(client: Client, url: &Url) -> Result<()> {
}
});

let request = Request::builder()
.method(Method::GET)
.uri(url.path())
let request = Request::get(uri)
.header("client", "quinn-h3:0.0.1")
.body(())
.expect("failed to build request");
Expand Down
23 changes: 13 additions & 10 deletions quinn-h3/examples/h3_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result};
use futures::{AsyncReadExt, StreamExt};
use http::{Response, StatusCode};
use structopt::{self, StructOpt};
use tracing::error;

use quinn::ConnectionDriver as QuicDriver;
use quinn_h3::{
Expand Down Expand Up @@ -60,14 +61,16 @@ async fn main() -> Result<()> {
println!("server listening");
while let Some(connecting) = incoming.next().await {
println!("server received connection");
let connection = connecting
.await
.map_err(|e| anyhow!("accept failed: {:?}", e))
.expect("server failed");

handle_connection(connection)
.await
.expect("handling connection failed")
match connecting.await {
Err(e) => error!("accept failed: {:?}", e),
Ok(connection) => {
let _ = tokio::spawn(async move {
if let Err(e) = handle_connection(connection).await {
error!("handling connection failed: {:?}", e);
}
});
}
}
}

Ok(())
Expand All @@ -78,7 +81,7 @@ async fn handle_connection(conn: (QuicDriver, ConnectionDriver, IncomingRequest)

tokio::spawn(async move {
if let Err(e) = h3_driver.await {
eprintln!("quic connection driver error: {}", e)
eprintln!("h3 connection driver error: {}", e)
}
});

Expand All @@ -100,7 +103,7 @@ async fn handle_connection(conn: (QuicDriver, ConnectionDriver, IncomingRequest)
}

async fn handle_request(recv_request: RecvRequest) -> Result<()> {
let (request, mut recv_body, sender) = recv_request.await.expect("recv request failed");
let (request, mut recv_body, sender) = recv_request.await?;
println!("received request: {:?}", request);

let mut body = Vec::with_capacity(1024);
Expand Down
2 changes: 1 addition & 1 deletion quinn-h3/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl AsyncRead for BodyReader {
if size > 0 {
Poll::Ready(Ok(size))
} else {
Poll::Ready(Err(io::Error::new(ErrorKind::WouldBlock, "stream blocked")))
Poll::Pending
}
}
Poll::Ready(Some(Err(e))) => {
Expand Down
5 changes: 4 additions & 1 deletion quinn-h3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,10 @@ impl Future for RecvResponse {
)))
}
RecvResponseState::Receiving(ref mut recv) => {
match ready!(Pin::new(recv).poll_next(cx)) {
let frame = ready!(Pin::new(recv).poll_next(cx));

trace!("client got {:?}", frame);
match frame {
None => return Poll::Ready(Err(Error::peer("received an empty response"))),
Some(Err(e)) => return Poll::Ready(Err(e.into())),
Some(Ok(f)) => match f {
Expand Down
11 changes: 6 additions & 5 deletions quinn-h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,19 @@ impl ConnectionInner {
let msg = format!("{:?}", e);
return Err(DriverError::new(e, ErrorCode::STREAM_CREATION_ERROR, msg));
}
Ok(n) => self.on_uni_resolved(n)?,
Ok(n) => self.on_uni_resolved(cx, n)?,
}
}
Ok(())
}

fn on_uni_resolved(&mut self, new_stream: NewUni) -> Result<(), DriverError> {
fn on_uni_resolved(&mut self, cx: &mut Context, new_stream: NewUni) -> Result<(), DriverError> {
match new_stream {
NewUni::Control(stream) => match self.recv_control {
None => {
trace!("Got control stream");
trace!("Got Control stream");
self.recv_control = Some(stream);
self.poll_recv_control(cx)?;
Ok(())
}
Some(_) => Err(DriverError::peer(
Expand All @@ -256,7 +257,7 @@ impl ConnectionInner {
},
NewUni::Decoder(s) => match self.recv_decoder {
None => {
trace!("Got decoder stream");
trace!("Got Decoder stream");
self.recv_decoder =
Some((s, BytesMut::with_capacity(RECV_DECODER_INITIAL_CAPACITY)));
Ok(())
Expand All @@ -268,7 +269,7 @@ impl ConnectionInner {
},
NewUni::Encoder(s) => match self.recv_encoder {
None => {
trace!("Got encoder stream");
trace!("Got Encoder stream");
self.recv_encoder =
Some((s, BytesMut::with_capacity(RECV_ENCODER_INITIAL_CAPACITY)));
Ok(())
Expand Down
18 changes: 17 additions & 1 deletion quinn-h3/src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,17 @@ impl Connection {
.set_max_blocked(settings.qpack_blocked_streams as usize)?;
self.encoder_table
.set_max_size(settings.qpack_max_table_capacity as usize)?;

if settings.qpack_max_table_capacity > 0 {
qpack::set_dynamic_table_size(
&mut self.encoder_table,
&mut self.pending_streams[PendingStreamType::Encoder as usize],
settings.qpack_max_table_capacity as usize,
)?;
};

self.remote_settings = Some(settings);

Ok(())
}

Expand Down Expand Up @@ -319,7 +329,13 @@ mod tests {
fn decode_header() {
let mut header_map = HeaderMap::new();
header_map.append("hello", HeaderValue::from_static("text/html"));
let header = Header::request(Method::GET, Uri::default(), header_map);
let header = Header::request(
Method::GET,
// NOTE: H3 adds a trailing `/`, so the one in following the url is important
// only to make the `Header` comparison succeed at the end of this test.
"https://example.com/".parse().expect("uri"),
header_map,
);

let mut client = Connection::default();
let encoded = client
Expand Down
Loading