diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 9a0bdf2a..4c387b42 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -1,10 +1,15 @@ +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; use futures::{ future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt, TryStreamExt, }; -use futures::{stream, Stream}; +use futures::{stream, FutureExt, Stream}; use quickcheck::{Arbitrary, Gen}; -use std::io; +use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, io, mem}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; @@ -91,6 +96,136 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: Ok(()) } +pub struct EchoServer { + connection: Connection, + worker_streams: FuturesUnordered>>, + streams_processed: usize, + connection_closed: bool, +} + +impl EchoServer { + pub fn new(connection: Connection) -> Self { + Self { + connection, + worker_streams: FuturesUnordered::default(), + streams_processed: 0, + connection_closed: false, + } + } +} + +impl Future for EchoServer +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match this.worker_streams.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(()))) => { + this.streams_processed += 1; + continue; + } + Poll::Ready(Some(Err(e))) => { + eprintln!("A stream failed: {}", e); + continue; + } + Poll::Ready(None) => { + if this.connection_closed { + return Poll::Ready(Ok(this.streams_processed)); + } + } + Poll::Pending => {} + } + + match this.connection.poll_next_inbound(cx) { + Poll::Ready(Some(Ok(mut stream))) => { + this.worker_streams.push( + async move { + { + let (mut r, mut w) = AsyncReadExt::split(&mut stream); + futures::io::copy(&mut r, &mut w).await?; + } + stream.close().await?; + Ok(()) + } + .boxed(), + ); + continue; + } + Poll::Ready(None) | Poll::Ready(Some(Err(_))) => { + this.connection_closed = true; + continue; + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + +#[derive(Debug)] +pub struct OpenStreamsClient { + connection: Option>, + streams: Vec, + to_open: usize, +} + +impl OpenStreamsClient { + pub fn new(connection: Connection, to_open: usize) -> Self { + Self { + connection: Some(connection), + streams: vec![], + to_open, + } + } +} + +impl Future for OpenStreamsClient +where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, +{ + type Output = yamux::Result<(Connection, Vec)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let connection = this.connection.as_mut().unwrap(); + + loop { + // Drive connection to make progress. + match connection.poll_next_inbound(cx)? { + Poll::Ready(_stream) => { + panic!("Unexpected inbound stream"); + } + Poll::Pending => {} + } + + if this.streams.len() < this.to_open { + match connection.poll_new_outbound(cx)? { + Poll::Ready(stream) => { + this.streams.push(stream); + continue; + } + Poll::Pending => {} + } + } + + if this.streams.len() == this.to_open { + return Poll::Ready(Ok(( + this.connection.take().unwrap(), + mem::take(&mut this.streams), + ))); + } + + return Poll::Pending; + } + } +} + #[derive(Clone, Debug)] pub struct Msg(pub Vec); diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 0cbf3a9f..a3ec5759 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,6 +1,6 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; use std::pin::Pin; @@ -8,8 +8,9 @@ use std::task::{Context, Poll}; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; +use tokio::task; use tokio_util::compat::TokioAsyncReadCompatExt; -use yamux::{Connection, Mode}; +use yamux::{Config, Connection, ConnectionError, Mode}; #[test] fn prop_config_send_recv_multi() { @@ -46,76 +47,28 @@ fn prop_config_send_recv_multi() { QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _) } -struct EchoServer { - connection: Connection, - worker_streams: FuturesUnordered>>, - streams_processed: usize, - connection_closed: bool, -} +#[test] +fn prop_max_streams() { + fn prop(n: usize) -> Result { + let max_streams = n % 100; + let mut cfg = Config::default(); + cfg.set_max_num_streams(max_streams); -impl EchoServer { - fn new(connection: Connection) -> Self { - Self { - connection, - worker_streams: FuturesUnordered::default(), - streams_processed: 0, - connection_closed: false, - } - } -} + Runtime::new().unwrap().block_on(async move { + let (server, client) = connected_peers(cfg.clone(), cfg).await?; -impl Future for EchoServer -where - T: AsyncRead + AsyncWrite + Unpin, -{ - type Output = yamux::Result; + task::spawn(EchoServer::new(server)); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let client = OpenStreamsClient::new(client, max_streams); - loop { - match this.worker_streams.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(()))) => { - this.streams_processed += 1; - continue; - } - Poll::Ready(Some(Err(e))) => { - eprintln!("A stream failed: {}", e); - continue; - } - Poll::Ready(None) => { - if this.connection_closed { - return Poll::Ready(Ok(this.streams_processed)); - } - } - Poll::Pending => {} - } - - match this.connection.poll_next_inbound(cx) { - Poll::Ready(Some(Ok(mut stream))) => { - this.worker_streams.push( - async move { - { - let (mut r, mut w) = AsyncReadExt::split(&mut stream); - futures::io::copy(&mut r, &mut w).await?; - } - stream.close().await?; - Ok(()) - } - .boxed(), - ); - continue; - } - Poll::Ready(None) | Poll::Ready(Some(Err(_))) => { - this.connection_closed = true; - continue; - } - Poll::Pending => {} - } + let (client, streams) = client.await?; + assert_eq!(streams.len(), max_streams); - return Poll::Pending; - } + let open_result = OpenStreamsClient::new(client, 1).await; + Ok(matches!(open_result, Err(ConnectionError::TooManyStreams))) + }) } + QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } struct MessageSender { diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 9add5b69..5a4be48f 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -54,37 +54,6 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } -#[test] -fn prop_config_send_recv_multi() { - fn prop( - mut msgs: Vec, - TestConfig(cfg1): TestConfig, - TestConfig(cfg2): TestConfig, - ) -> Result<(), ConnectionError> { - msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); - - Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2).await?; - - let server = echo_server(server); - let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_separate_streams(control, msgs).await?; - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new() - .tests(10) - .quickcheck(prop as fn(_, _, _) -> _) -} - #[test] fn prop_send_recv() { fn prop(msgs: Vec) -> Result { @@ -112,34 +81,6 @@ fn prop_send_recv() { QuickCheck::new().tests(1).quickcheck(prop as fn(_) -> _) } -#[test] -fn prop_max_streams() { - fn prop(n: usize) -> Result { - let max_streams = n % 100; - let mut cfg = Config::default(); - cfg.set_max_num_streams(max_streams); - - Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg.clone(), cfg).await?; - - task::spawn(echo_server(server)); - - let (mut control, client) = Control::new(client); - task::spawn(noop_server(client)); - - let mut v = Vec::new(); - for _ in 0..max_streams { - v.push(control.open_stream().await?) - } - - let open_result = control.open_stream().await; - - Ok(matches!(open_result, Err(ConnectionError::TooManyStreams))) - }) - } - QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) -} - #[test] fn prop_send_recv_half_closed() { fn prop(msg: Msg) -> Result<(), ConnectionError> {