diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index ab74b52217e..cf7681997ba 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -40,8 +40,13 @@ rustc-args = ["--cfg", "docsrs"] [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } -libp2p = { path = "../..", features = ["request-response", "tcp", "yamux", "noise", "async-std"] } env_logger = "0.9.0" -rand = "0.8.4" -tokio = { version = "1.21.1", features = ["macros", "rt-multi-thread"] } +libp2p = { path = "../..", features = ["tcp", "yamux", "noise", "async-std"] } +libp2p-muxer-test-harness = { path = "../../muxers/test-harness" } quickcheck = "1" +rand = "0.8.4" +tokio = { version = "1.21.1", features = ["macros", "rt-multi-thread", "time"] } + +[[test]] +name = "stream_compliance" +required-features = ["async-std"] diff --git a/transports/quic/src/connection/substream.rs b/transports/quic/src/connection/substream.rs index a4fe9d7c3b0..2ad0feb9d69 100644 --- a/transports/quic/src/connection/substream.rs +++ b/transports/quic/src/connection/substream.rs @@ -191,7 +191,10 @@ impl AsyncWrite for Substream { Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err))) } Err(quinn_proto::FinishError::UnknownStream) => { - Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) + // We never make up IDs so the stream must have existed at some point if we get to here. + // `UnknownStream` is also emitted in case the stream is already finished, hence just + // return `Ok(())` here. + Poll::Ready(Ok(())) } } } diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index a67a0757674..25f410b5d38 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -1,60 +1,23 @@ #![cfg(any(feature = "async-std", feature = "tokio"))] -use async_trait::async_trait; -use futures::channel::oneshot; -use futures::future::{join, FutureExt}; -use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; -use futures::select; +use futures::channel::mpsc; +use futures::future::Either; use futures::stream::StreamExt; -use futures::task::Spawn; +use futures::{future, AsyncReadExt, AsyncWriteExt, SinkExt}; use libp2p::core::multiaddr::Protocol; -use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::{upgrade, ConnectedPoint, Transport}; -use libp2p::request_response::{ - ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, -}; -use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; -use libp2p::swarm::{ConnectionError, DialError, Swarm, SwarmEvent}; +use libp2p::core::Transport; use libp2p::{noise, tcp, yamux, Multiaddr}; use libp2p_core::either::EitherOutput; -use libp2p_core::transport::OrTransport; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::transport::{Boxed, OrTransport, TransportEvent}; +use libp2p_core::{upgrade, PeerId}; use libp2p_quic as quic; use quic::Provider; use rand::RngCore; +use std::future::Future; +use std::io; use std::num::NonZeroU8; use std::time::Duration; -use std::{io, iter}; - -fn generate_tls_keypair() -> libp2p::identity::Keypair { - libp2p::identity::Keypair::generate_ed25519() -} - -async fn create_swarm() -> Swarm> { - let keypair = generate_tls_keypair(); - let peer_id = keypair.public().to_peer_id(); - let mut config = quic::Config::new(&keypair); - config.handshake_timeout = Duration::from_secs(1); - let transport = quic::GenTransport::

::new(config); - - let transport = Transport::map(transport, |(peer, connection), _| { - (peer, StreamMuxerBox::new(connection)) - }) - .boxed(); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - let behaviour = RequestResponse::new(PingCodec(), protocols, cfg); - Swarm::new(transport, behaviour, peer_id) -} - -async fn start_listening(swarm: &mut Swarm>, addr: &str) -> Multiaddr { - swarm.listen_on(addr.parse().unwrap()).unwrap(); - match swarm.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - } -} #[cfg(feature = "tokio")] #[tokio::test] @@ -68,614 +31,409 @@ async fn async_std_smoke() { smoke::().await } -async fn smoke() { +#[cfg(feature = "async-std")] +#[async_std::test] +async fn dial_failure() { let _ = env_logger::try_init(); - let mut rng = rand::thread_rng(); - - let mut a = create_swarm::

().await; - let mut b = create_swarm::

().await; + let mut a = create_transport::().1; + let mut b = create_transport::().1; let addr = start_listening(&mut a, "/ip4/127.0.0.1/udp/0/quic").await; + drop(a); // stop a so b can never reach it - let mut data = vec![0; 4096 * 10]; - rng.fill_bytes(&mut data); + match dial(&mut b, addr).await { + Ok(_) => panic!("Expected dial to fail"), + Err(error) => { + assert_eq!("Handshake with the remote timed out.", error.to_string()) + } + }; +} - b.behaviour_mut().add_address(a.local_peer_id(), addr); - b.behaviour_mut() - .send_request(a.local_peer_id(), Ping(data.clone())); +#[cfg(feature = "tokio")] +#[tokio::test] +async fn endpoint_reuse() { + let _ = env_logger::try_init(); + let (_, mut a_transport) = create_transport::(); + let (_, mut b_transport) = create_transport::(); - let b_id = *b.local_peer_id(); + let a_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await; + let ((_, b_send_back_addr, _), _) = + connect(&mut a_transport, &mut b_transport, a_addr.clone()).await; - let (sync_tx, sync_rx) = oneshot::channel(); + // Expect the dial to fail since b is not listening on an address. + match dial(&mut a_transport, b_send_back_addr).await { + Ok(_) => panic!("Expected dial to fail"), + Err(error) => { + assert_eq!("Handshake with the remote timed out.", error.to_string()) + } + }; - let fut_a = async move { - match a.next().await { - Some(SwarmEvent::IncomingConnection { .. }) => {} - e => panic!("{:?}", e), - }; + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic").await; + let ((_, a_send_back_addr, _), _) = connect(&mut b_transport, &mut a_transport, b_addr).await; - match a.next().await { - Some(SwarmEvent::ConnectionEstablished { .. }) => {} - e => panic!("{:?}", e), - }; + assert_eq!(a_send_back_addr, a_addr); +} - match a.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(ping), - channel, - .. - }, - .. - })) => { - a.behaviour_mut() - .send_response(channel, Pong(ping)) - .unwrap(); - } - e => panic!("{:?}", e), - } +#[cfg(feature = "async-std")] +#[async_std::test] +async fn ipv4_dial_ipv6() { + let _ = env_logger::try_init(); + let (a_peer_id, mut a_transport) = create_transport::(); + let (b_peer_id, mut b_transport) = create_transport::(); - match a.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} - e => panic!("{:?}", e), - } + let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await; + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, a_addr).await; - a.behaviour_mut() - .send_request(&b_id, Ping(b"another substream".to_vec())); - - assert!(a.next().now_or_never().is_none()); - - match a.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(data), - .. - }, - .. - })) => assert_eq!(data, b"another substream".to_vec()), - e => panic!("{:?}", e), - } + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); +} - sync_rx.await.unwrap(); +#[cfg(feature = "async-std")] +#[async_std::test] +#[ignore] // Transport currently does not validate PeerId. Enable once we make use of PeerId validation in rustls. +async fn wrong_peerid() { + use libp2p::PeerId; - a.disconnect_peer_id(b_id).unwrap(); + let (a_peer_id, mut a_transport) = create_transport::(); + let (b_peer_id, mut b_transport) = create_transport::(); - match a.next().await { - Some(SwarmEvent::ConnectionClosed { cause: None, .. }) => {} - e => panic!("{:?}", e), - } - }; + let a_addr = start_listening(&mut a_transport, "/ip6/::1/udp/0/quic").await; + let a_addr_random_peer = a_addr.with(Protocol::P2p(PeerId::random().into())); - let fut_b = async { - match b.next().await { - Some(SwarmEvent::Dialing(_)) => {} - e => panic!("{:?}", e), - } + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, a_addr_random_peer).await; - match b.next().await { - Some(SwarmEvent::ConnectionEstablished { .. }) => {} - e => panic!("{:?}", e), - }; + assert_ne!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); +} - assert!(b.next().now_or_never().is_none()); - - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(pong), - .. - }, - .. - })) => assert_eq!(data, pong), - e => panic!("{:?}", e), - } +#[cfg(feature = "async-std")] +fn new_tcp_quic_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { + let keypair = generate_tls_keypair(); + let peer_id = keypair.public().to_peer_id(); + let mut config = quic::Config::new(&keypair); + config.handshake_timeout = Duration::from_secs(1); - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(data), - channel, - .. - }, - .. - })) => { - b.behaviour_mut() - .send_response(channel, Pong(data)) - .unwrap(); - } - e => panic!("{:?}", e), - } + let quic_transport = quic::async_std::Transport::new(config); + let tcp_transport = tcp::async_io::Transport::new(tcp::Config::default()) + .upgrade(upgrade::Version::V1) + .authenticate( + noise::NoiseConfig::xx( + noise::Keypair::::new() + .into_authentic(&keypair) + .unwrap(), + ) + .into_authenticated(), + ) + .multiplex(yamux::YamuxConfig::default()); - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} - e => panic!("{:?}", e), - } + let transport = OrTransport::new(quic_transport, tcp_transport) + .map(|either_output, _| match either_output { + EitherOutput::First((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + EitherOutput::Second((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed(); - sync_tx.send(()).unwrap(); + (peer_id, transport) +} - match b.next().await { - Some(SwarmEvent::ConnectionClosed { - cause: Some(ConnectionError::IO(_)), - .. - }) => {} - e => panic!("{:?}", e), - } - }; +#[cfg(feature = "async-std")] +#[async_std::test] +async fn tcp_and_quic() { + let (a_peer_id, mut a_transport) = new_tcp_quic_transport(); + let (b_peer_id, mut b_transport) = new_tcp_quic_transport(); - join(fut_a, fut_b).await; -} + let quic_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await; + let tcp_addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/tcp/0").await; -#[derive(Debug, Clone)] -struct PingProtocol(); + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, quic_addr).await; + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); -#[derive(Clone)] -struct PingCodec(); + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, tcp_addr).await; + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); +} -#[derive(Debug, Clone, PartialEq, Eq)] -struct Ping(Vec); +// Note: This test should likely be ported to the muxer compliance test suite. +#[cfg(feature = "async-std")] +#[test] +fn concurrent_connections_and_streams_async_std() { + let _ = env_logger::try_init(); -#[derive(Debug, Clone, PartialEq, Eq)] -struct Pong(Vec); + quickcheck::QuickCheck::new() + .min_tests_passed(1) + .quickcheck(prop:: as fn(_, _) -> _); +} -impl ProtocolName for PingProtocol { - fn protocol_name(&self) -> &[u8] { - "/ping/1".as_bytes() - } +// Note: This test should likely be ported to the muxer compliance test suite. +#[cfg(feature = "tokio")] +#[test] +fn concurrent_connections_and_streams_tokio() { + let _ = env_logger::try_init(); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let _guard = rt.enter(); + quickcheck::QuickCheck::new() + .min_tests_passed(1) + .quickcheck(prop:: as fn(_, _) -> _); } -#[async_trait] -impl RequestResponseCodec for PingCodec { - type Protocol = PingProtocol; - type Request = Ping; - type Response = Pong; - - async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - upgrade::read_length_prefixed(io, 4096 * 10) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Ping(vec)), - }) - .await - } +async fn smoke() { + let _ = env_logger::try_init(); - async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - upgrade::read_length_prefixed(io, 4096 * 10) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Pong(vec)), - }) - .await - } + let (a_peer_id, mut a_transport) = create_transport::

(); + let (b_peer_id, mut b_transport) = create_transport::

(); - async fn write_request( - &mut self, - _: &PingProtocol, - io: &mut T, - Ping(data): Ping, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - upgrade::write_length_prefixed(io, data).await?; - io.close().await?; - Ok(()) - } + let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/quic").await; + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, addr).await; - async fn write_response( - &mut self, - _: &PingProtocol, - io: &mut T, - Pong(data): Pong, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - upgrade::write_length_prefixed(io, data).await?; - io.close().await?; - Ok(()) - } + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); } -#[cfg(feature = "async-std")] -#[async_std::test] -async fn dial_failure() { - let _ = env_logger::try_init(); - let mut a = create_swarm::().await; - let mut b = create_swarm::().await; +fn generate_tls_keypair() -> libp2p::identity::Keypair { + libp2p::identity::Keypair::generate_ed25519() +} - let addr = start_listening(&mut a, "/ip4/127.0.0.1/udp/0/quic").await; +fn create_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { + let keypair = generate_tls_keypair(); + let peer_id = keypair.public().to_peer_id(); - let a_peer_id = &Swarm::local_peer_id(&a).clone(); - drop(a); // stop a swarm so b can never reach it + let transport = quic::GenTransport::

::new(quic::Config::new(&keypair)) + .map(|(p, c), _| (p, StreamMuxerBox::new(c))) + .boxed(); - b.behaviour_mut().add_address(a_peer_id, addr); - b.behaviour_mut() - .send_request(a_peer_id, Ping(b"hello world".to_vec())); + (peer_id, transport) +} - match b.next().await { - Some(SwarmEvent::Dialing(_)) => {} +async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr { + transport.listen_on(addr.parse().unwrap()).unwrap(); + match transport.next().await { + Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr, e => panic!("{:?}", e), } +} - match b.next().await { - Some(SwarmEvent::OutgoingConnectionError { .. }) => {} - e => panic!("{:?}", e), - }; +fn prop( + number_listeners: NonZeroU8, + number_streams: NonZeroU8, +) -> quickcheck::TestResult { + const BUFFER_SIZE: usize = 4096 * 10; - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { .. })) => {} - e => panic!("{:?}", e), - }; -} + let number_listeners = u8::from(number_listeners) as usize; + let number_streams = u8::from(number_streams) as usize; -#[test] -fn concurrent_connections_and_streams() { - use quickcheck::*; - - fn prop(number_listeners: NonZeroU8, number_streams: NonZeroU8) -> TestResult { - let (number_listeners, number_streams): (u8, u8) = - (number_listeners.into(), number_streams.into()); - if number_listeners > 10 || number_streams > 10 { - return TestResult::discard(); - } + if number_listeners > 10 || number_streams > 10 { + return quickcheck::TestResult::discard(); + } - let mut pool = futures::executor::LocalPool::default(); - let mut data = vec![0; 4096 * 10]; - rand::thread_rng().fill_bytes(&mut data); - let mut listeners = vec![]; - - // Spawn the listener nodes. - for _ in 0..number_listeners { - let mut listener = pool.run_until(create_swarm::

()); - let addr = pool.run_until(start_listening(&mut listener, "/ip4/127.0.0.1/udp/0/quic")); - - listeners.push((*listener.local_peer_id(), addr)); - - pool.spawner() - .spawn_obj( - async move { - loop { - match listener.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(ping), - channel, - .. - }, - .. - })) => { - listener - .behaviour_mut() - .send_response(channel, Pong(ping)) - .unwrap(); - } - Some(SwarmEvent::Behaviour( - RequestResponseEvent::ResponseSent { .. }, - )) - | Some(SwarmEvent::ConnectionEstablished { .. }) - | Some(SwarmEvent::IncomingConnection { .. }) - | Some(SwarmEvent::ConnectionClosed { .. }) => {} - Some(e) => { - panic!("unexpected event {:?}", e); - } - None => { - panic!("listener stopped"); - } - } - } - } - .boxed() - .into(), - ) - .unwrap(); - } + let (listeners_tx, mut listeners_rx) = mpsc::channel(number_listeners); - let mut dialer = pool.run_until(create_swarm::

()); + log::info!("Creating {number_streams} streams on {number_listeners} connections"); - // For each listener node start `number_streams` requests. - for (listener_peer_id, listener_addr) in &listeners { - dialer - .behaviour_mut() - .add_address(listener_peer_id, listener_addr.clone()); + // Spawn the listener nodes. + for _ in 0..number_listeners { + P::spawn({ + let mut listeners_tx = listeners_tx.clone(); - dialer.dial(*listener_peer_id).unwrap(); - } + async move { + let (peer_id, mut listener) = create_transport::

(); + let addr = start_listening(&mut listener, "/ip4/127.0.0.1/udp/0/quic").await; - // Wait for responses to each request. - pool.run_until(async { - let mut num_responses = 0; - loop { - match dialer.next().await { - Some(SwarmEvent::Dialing(_)) => {} - Some(SwarmEvent::ConnectionEstablished { peer_id, .. }) => { - for _ in 0..number_streams { - dialer - .behaviour_mut() - .send_request(&peer_id, Ping(data.clone())); - } - } - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(pong), - .. - }, - .. - })) => { - num_responses += 1; - assert_eq!(data, pong); - let should_be = number_listeners as usize * (number_streams) as usize; - if num_responses == should_be { - break; - } - } - Some(SwarmEvent::ConnectionClosed { .. }) => {} - e => { - panic!("unexpected event {:?}", e); + listeners_tx.send((peer_id, addr)).await.unwrap(); + + loop { + if let TransportEvent::Incoming { upgrade, .. } = + listener.select_next_some().await + { + let (_, connection) = upgrade.await.unwrap(); + + P::spawn(answer_inbound_streams::(connection)); } } } - }); - - TestResult::passed() - } - - #[cfg(feature = "tokio")] - { - let rt = tokio::runtime::Runtime::new().unwrap(); - let _guard = rt.enter(); - QuickCheck::new().quickcheck(prop:: as fn(_, _) -> _); + }) } - #[cfg(feature = "async-std")] - QuickCheck::new().quickcheck(prop:: as fn(_, _) -> _); -} + let (completed_streams_tx, completed_streams_rx) = + mpsc::channel(number_streams * number_listeners); -#[cfg(feature = "tokio")] -#[tokio::test] -async fn endpoint_reuse() { - let _ = env_logger::try_init(); - let mut swarm_a = create_swarm::().await; - let mut swarm_b = create_swarm::().await; - let b_peer_id = *swarm_b.local_peer_id(); - - let a_addr = start_listening(&mut swarm_a, "/ip4/127.0.0.1/udp/0/quic").await; - - swarm_b.dial(a_addr.clone()).unwrap(); - let b_send_back_addr = loop { - select! { - ev = swarm_a.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { endpoint, .. } => { - break endpoint.get_remote_address().clone() - } - SwarmEvent::IncomingConnection { local_addr, ..} => { - assert!(swarm_a.listeners().any(|a| a == &local_addr)); - } - e => panic!("{:?}", e), - }, - ev = swarm_b.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => {}, - e => panic!("{:?}", e), - } - } - }; + // For each listener node start `number_streams` requests. + P::spawn(async move { + let (_, mut dialer) = create_transport::

(); - let dial_opts = DialOpts::peer_id(b_peer_id) - .addresses(vec![b_send_back_addr.clone()]) - .extend_addresses_through_behaviour() - .condition(PeerCondition::Always) - .build(); - swarm_a.dial(dial_opts).unwrap(); + while let Some((_, listener_addr)) = listeners_rx.next().await { + let (_, connection) = dial(&mut dialer, listener_addr.clone()).await.unwrap(); - // Expect the dial to fail since b is not listening on an address. - loop { - select! { - ev = swarm_a.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { ..} => panic!("Unexpected dial success."), - SwarmEvent::OutgoingConnectionError {error, .. } => { - assert!(matches!(error, DialError::Transport(_))); - break - } - _ => {} - }, - _ = swarm_b.select_next_some() => {}, + P::spawn(open_outbound_streams::( + connection, + number_streams, + completed_streams_tx.clone(), + )) } - } - let b_addr = start_listening(&mut swarm_b, "/ip4/127.0.0.1/udp/0/quic").await; - - let dial_opts = DialOpts::peer_id(b_peer_id) - .addresses(vec![b_addr.clone(), b_send_back_addr]) - .condition(PeerCondition::Always) - .build(); - swarm_a.dial(dial_opts).unwrap(); - let expected_b_addr = b_addr.with(Protocol::P2p(b_peer_id.into())); - - let mut a_reported = false; - let mut b_reported = false; - while !a_reported || !b_reported { - select! { - ev = swarm_a.select_next_some() => match ev{ - SwarmEvent::ConnectionEstablished { endpoint, ..} => { - assert!(endpoint.is_dialer()); - assert_eq!(endpoint.get_remote_address(), &expected_b_addr); - a_reported = true; - } - SwarmEvent::OutgoingConnectionError {error, .. } => { - panic!("Unexpected error {:}", error) - } - _ => {} - }, - ev = swarm_b.select_next_some() => { - if let SwarmEvent::ConnectionEstablished { endpoint, ..} = ev { - match endpoint { - ConnectedPoint::Dialer{..} => panic!("Unexpected outbound connection"), - ConnectedPoint::Listener {send_back_addr, local_addr} => { - // Expect that the local listening endpoint was used for dialing. - assert!(swarm_b.listeners().any(|a| a == &local_addr)); - assert_eq!(send_back_addr, a_addr); - b_reported = true; - } - } - } - }, + // Drive the dialer. + loop { + dialer.next().await; } - } -} + }); -#[cfg(feature = "async-std")] -#[async_std::test] -async fn ipv4_dial_ipv6() { - let _ = env_logger::try_init(); - let mut swarm_a = create_swarm::().await; - let mut swarm_b = create_swarm::().await; + let completed_streams = number_streams * number_listeners; - let a_addr = start_listening(&mut swarm_a, "/ip6/::1/udp/0/quic").await; + // Wait for all streams to complete. + P::block_on( + completed_streams_rx + .take(completed_streams as usize) + .collect::>(), + Duration::from_secs(30), + ); - swarm_b.dial(a_addr.clone()).unwrap(); - - loop { - select! { - ev = swarm_a.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => { - return; - } - SwarmEvent::IncomingConnection { local_addr, ..} => { - assert!(swarm_a.listeners().any(|a| a == &local_addr)); - } - e => panic!("{:?}", e), - }, - ev = swarm_b.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => {}, - e => panic!("{:?}", e), - } - } - } + quickcheck::TestResult::passed() } -#[cfg(feature = "async-std")] -#[async_std::test] -async fn wrong_peerid() { - use libp2p::PeerId; +async fn answer_inbound_streams( + mut connection: StreamMuxerBox, +) { + loop { + let mut inbound_stream = match future::poll_fn(|cx| { + let _ = connection.poll_unpin(cx)?; - let _ = env_logger::try_init(); - let mut swarm_a = create_swarm::().await; - let mut swarm_b = create_swarm::().await; + connection.poll_inbound_unpin(cx) + }) + .await + { + Ok(s) => s, + Err(_) => return, + }; - let a_addr = start_listening(&mut swarm_a, "/ip6/::1/udp/0/quic").await; - let a_id = *swarm_a.local_peer_id(); + P::spawn(async move { + // FIXME: Need to write _some_ data before we can read on both sides. + // Do a ping-pong exchange. + { + let mut pong = [0u8; 4]; + inbound_stream.write_all(b"PING").await.unwrap(); + inbound_stream.flush().await.unwrap(); + inbound_stream.read_exact(&mut pong).await.unwrap(); + assert_eq!(&pong, b"PONG"); + } - let wrong_id = PeerId::random(); - let dial_ops = DialOpts::peer_id(wrong_id).addresses(vec![a_addr]).build(); - swarm_b.dial(dial_ops).unwrap(); + let mut data = vec![0; BUFFER_SIZE]; - loop { - select! { - _ = swarm_a.select_next_some() => {}, - ev = swarm_b.select_next_some() => match ev { - SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, wrong_id), - SwarmEvent::OutgoingConnectionError {peer_id: Some(peer_id), error: DialError::WrongPeerId { obtained, .. }} => { - assert_eq!(peer_id, wrong_id); - assert_eq!(obtained, a_id); - break; - }, - e => panic!("{:?}", e), - } - } + inbound_stream.read_exact(&mut data).await.unwrap(); + inbound_stream.write_all(&data).await.unwrap(); + inbound_stream.close().await.unwrap(); + }); } } -#[cfg(feature = "async-std")] -fn new_tcp_quic_swarm() -> Swarm> { - let keypair = generate_tls_keypair(); - let peer_id = keypair.public().to_peer_id(); - let mut config = quic::Config::new(&keypair); - config.handshake_timeout = Duration::from_secs(1); - let quic_transport = quic::async_std::Transport::new(config); - let tcp_transport = tcp::async_io::Transport::new(tcp::Config::default()) - .upgrade(upgrade::Version::V1Lazy) - .authenticate( - noise::NoiseConfig::xx( - noise::Keypair::::new() - .into_authentic(&keypair) - .unwrap(), - ) - .into_authenticated(), - ) - .multiplex(yamux::YamuxConfig::default()); +async fn open_outbound_streams( + mut connection: StreamMuxerBox, + number_streams: usize, + completed_streams_tx: mpsc::Sender<()>, +) { + for _ in 0..number_streams { + let mut outbound_stream = future::poll_fn(|cx| { + let _ = connection.poll_unpin(cx)?; - let transport = OrTransport::new(quic_transport, tcp_transport) - .map(|either_output, _| match either_output { - EitherOutput::First((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - EitherOutput::Second((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + connection.poll_outbound_unpin(cx) }) - .boxed(); + .await + .unwrap(); + + P::spawn({ + let mut completed_streams_tx = completed_streams_tx.clone(); + + async move { + // FIXME: Need to write _some_ data before we can read on both sides. + // Do a ping-pong exchange. + { + let mut ping = [0u8; 4]; + outbound_stream.write_all(b"PONG").await.unwrap(); + outbound_stream.flush().await.unwrap(); + outbound_stream.read_exact(&mut ping).await.unwrap(); + assert_eq!(&ping, b"PING"); + } - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - let behaviour = RequestResponse::new(PingCodec(), protocols, cfg); + let mut data = vec![0; BUFFER_SIZE]; + rand::thread_rng().fill_bytes(&mut data); - Swarm::new(transport, behaviour, peer_id) -} + let mut received = Vec::new(); -#[cfg(feature = "async-std")] -#[async_std::test] -async fn tcp_and_quic() { - let mut swarm_a = new_tcp_quic_swarm(); - let swarm_a_id = *swarm_a.local_peer_id(); - println!("{}", swarm_a_id); + outbound_stream.write_all(&data).await.unwrap(); + outbound_stream.flush().await.unwrap(); + outbound_stream.read_to_end(&mut received).await.unwrap(); - let mut swarm_b = new_tcp_quic_swarm(); + assert_eq!(received, data); - let quic_addr = start_listening(&mut swarm_a, "/ip4/127.0.0.1/udp/0/quic").await; - let tcp_addr = start_listening(&mut swarm_a, "/ip4/127.0.0.1/tcp/0").await; + completed_streams_tx.send(()).await.unwrap(); + } + }); + } - swarm_b.dial(quic_addr.clone()).unwrap(); + log::info!("Created {number_streams} streams"); - loop { - select! { - ev = swarm_a.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => break, - SwarmEvent::IncomingConnection { .. } => { } - e => panic!("{:?}", e), - }, - ev = swarm_b.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => {}, - e => panic!("{:?}", e), - } + while future::poll_fn(|cx| connection.poll_unpin(cx)) + .await + .is_ok() + {} +} + +/// Helper function for driving two transports until they established a connection. +async fn connect( + listener: &mut Boxed<(PeerId, StreamMuxerBox)>, + dialer: &mut Boxed<(PeerId, StreamMuxerBox)>, + addr: Multiaddr, +) -> ( + (PeerId, Multiaddr, StreamMuxerBox), + (PeerId, StreamMuxerBox), +) { + future::join( + async { + let (upgrade, send_back_addr) = + listener.select_next_some().await.into_incoming().unwrap(); + let (peer_id, connection) = upgrade.await.unwrap(); + + (peer_id, send_back_addr, connection) + }, + async { dial(dialer, addr).await.unwrap() }, + ) + .await +} + +/// Helper function for dialling that also polls the `Transport`. +async fn dial( + transport: &mut Boxed<(PeerId, StreamMuxerBox)>, + addr: Multiaddr, +) -> io::Result<(PeerId, StreamMuxerBox)> { + match future::select(transport.dial(addr).unwrap(), transport.next()).await { + Either::Left((conn, _)) => conn, + Either::Right((event, _)) => { + panic!("Unexpected event: {event:?}") } } +} - swarm_b.dial(tcp_addr).unwrap(); +trait BlockOn { + fn block_on(future: impl Future + Send, timeout: Duration) -> R; +} - loop { - select! { - ev = swarm_a.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => break, - SwarmEvent::IncomingConnection { .. } - | SwarmEvent::ConnectionClosed { .. } => { } - e => panic!("{:?}", e), - }, - ev = swarm_b.select_next_some() => match ev { - SwarmEvent::ConnectionEstablished { .. } => {}, - SwarmEvent::ConnectionClosed { endpoint, .. } => { - assert_eq!(endpoint.get_remote_address(), &quic_addr ); - } - e => panic!("{:?}", e), - } - } +#[cfg(feature = "async-std")] +impl BlockOn for libp2p_quic::async_std::Provider { + fn block_on(future: impl Future + Send, timeout: Duration) -> R { + async_std::task::block_on(async_std::future::timeout(timeout, future)).unwrap() + } +} + +#[cfg(feature = "tokio")] +impl BlockOn for libp2p_quic::tokio::Provider { + fn block_on(future: impl Future + Send, timeout: Duration) -> R { + tokio::runtime::Handle::current() + .block_on(tokio::time::timeout(timeout, future)) + .unwrap() } } diff --git a/transports/quic/tests/stream_compliance.rs b/transports/quic/tests/stream_compliance.rs new file mode 100644 index 00000000000..c2b6f4bed85 --- /dev/null +++ b/transports/quic/tests/stream_compliance.rs @@ -0,0 +1,78 @@ +use futures::channel::oneshot; +use futures::StreamExt; +use libp2p_core::Transport; +use libp2p_quic as quic; +use std::time::Duration; + +#[async_std::test] +async fn close_implies_flush() { + let (alice, bob) = connected_peers().await; + + libp2p_muxer_test_harness::close_implies_flush(alice, bob).await; +} + +#[async_std::test] +#[ignore] // Hangs forever, same as yamux. We can't read from a stream that we have never written to. +async fn dialer_can_receive() { + let (alice, bob) = connected_peers().await; + + libp2p_muxer_test_harness::dialer_can_receive(alice, bob).await; +} + +#[async_std::test] +async fn read_after_close() { + let (alice, bob) = connected_peers().await; + + libp2p_muxer_test_harness::read_after_close(alice, bob).await; +} + +async fn connected_peers() -> (quic::Connection, quic::Connection) { + let mut dialer = new_transport().boxed(); + let mut listener = new_transport().boxed(); + + listener + .listen_on("/ip4/127.0.0.1/udp/0/quic".parse().unwrap()) + .unwrap(); + let listen_address = listener.next().await.unwrap().into_new_address().unwrap(); + + let (dialer_conn_sender, dialer_conn_receiver) = oneshot::channel(); + let (listener_conn_sender, listener_conn_receiver) = oneshot::channel(); + + async_std::task::spawn(async move { + let (upgrade, _) = listener.next().await.unwrap().into_incoming().unwrap(); + + async_std::task::spawn(async move { + let (_, connection) = upgrade.await.unwrap(); + + let _ = listener_conn_sender.send(connection); + }); + + loop { + listener.next().await; + } + }); + let dial_fut = dialer.dial(listen_address).unwrap(); + async_std::task::spawn(async move { + let connection = dial_fut.await.unwrap().1; + + let _ = dialer_conn_sender.send(connection); + }); + + async_std::task::spawn(async move { + loop { + dialer.next().await; + } + }); + + futures::future::try_join(dialer_conn_receiver, listener_conn_receiver) + .await + .unwrap() +} + +fn new_transport() -> quic::async_std::Transport { + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + let mut config = quic::Config::new(&keypair); + config.handshake_timeout = Duration::from_secs(1); + + quic::async_std::Transport::new(config) +}