diff --git a/Cargo.lock b/Cargo.lock index 38e0d099ed4..51249c98f26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1190,6 +1190,7 @@ dependencies = [ "futures", "futures-timer", "libp2p", + "libp2p-quic", "log", ] @@ -4112,6 +4113,7 @@ dependencies = [ "env_logger 0.10.0", "futures", "libp2p", + "libp2p-quic", ] [[package]] diff --git a/examples/dcutr/Cargo.toml b/examples/dcutr/Cargo.toml index 40198aaeadd..1b1230796f9 100644 --- a/examples/dcutr/Cargo.toml +++ b/examples/dcutr/Cargo.toml @@ -11,4 +11,5 @@ env_logger = "0.10.0" futures = "0.3.28" futures-timer = "3.0" libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] } +libp2p-quic = { path = "../../transports/quic", features = ["async-std"] } log = "0.4" diff --git a/examples/dcutr/src/main.rs b/examples/dcutr/src/main.rs index 83b64c98a5e..8359bb1902a 100644 --- a/examples/dcutr/src/main.rs +++ b/examples/dcutr/src/main.rs @@ -23,13 +23,14 @@ use clap::Parser; use futures::{ executor::{block_on, ThreadPool}, - future::FutureExt, + future::{Either, FutureExt}, stream::StreamExt, }; use libp2p::{ core::{ multiaddr::{Multiaddr, Protocol}, - transport::{OrTransport, Transport}, + muxing::StreamMuxerBox, + transport::Transport, upgrade, }, dcutr, @@ -38,9 +39,9 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, }; +use libp2p_quic as quic; use log::info; use std::error::Error; -use std::net::Ipv4Addr; use std::str::FromStr; #[derive(Debug, Parser)] @@ -91,19 +92,26 @@ fn main() -> Result<(), Box> { let (relay_transport, client) = relay::client::new(local_peer_id); - let transport = OrTransport::new( - relay_transport, - block_on(DnsConfig::system(tcp::async_io::Transport::new( - tcp::Config::default().port_reuse(true), - ))) - .unwrap(), - ) - .upgrade(upgrade::Version::V1Lazy) - .authenticate( - noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."), - ) - .multiplex(yamux::Config::default()) - .boxed(); + let transport = { + let relay_tcp_quic_transport = relay_transport + .or_transport(tcp::async_io::Transport::new( + tcp::Config::default().port_reuse(true), + )) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&local_key).unwrap()) + .multiplex(yamux::Config::default()) + .or_transport(quic::async_std::Transport::new(quic::Config::new( + &local_key, + ))); + + block_on(DnsConfig::system(relay_tcp_quic_transport)) + .unwrap() + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed() + }; #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event")] @@ -164,11 +172,10 @@ fn main() -> Result<(), Box> { .build(); swarm - .listen_on( - Multiaddr::empty() - .with("0.0.0.0".parse::().unwrap().into()) - .with(Protocol::Tcp(0)), - ) + .listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap()) + .unwrap(); + swarm + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) .unwrap(); // Wait to listen on all interfaces. diff --git a/examples/relay-server/Cargo.toml b/examples/relay-server/Cargo.toml index 2b204fe3bf0..ebe0da7c66a 100644 --- a/examples/relay-server/Cargo.toml +++ b/examples/relay-server/Cargo.toml @@ -12,3 +12,4 @@ async-trait = "0.1" env_logger = "0.10.0" futures = "0.3.28" libp2p = { path = "../../libp2p", features = ["async-std", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay"] } +libp2p-quic = { path = "../../transports/quic", features = ["async-std"] } diff --git a/examples/relay-server/src/main.rs b/examples/relay-server/src/main.rs index 5a2b61d853a..6a1d956b5a5 100644 --- a/examples/relay-server/src/main.rs +++ b/examples/relay-server/src/main.rs @@ -22,10 +22,11 @@ #![doc = include_str!("../README.md")] use clap::Parser; -use futures::executor::block_on; use futures::stream::StreamExt; +use futures::{executor::block_on, future::Either}; use libp2p::{ core::multiaddr::Protocol, + core::muxing::StreamMuxerBox, core::upgrade, core::{Multiaddr, Transport}, identify, identity, @@ -34,6 +35,7 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, }; +use libp2p_quic as quic; use std::error::Error; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -50,12 +52,21 @@ fn main() -> Result<(), Box> { let tcp_transport = tcp::async_io::Transport::default(); - let transport = tcp_transport + let tcp_transport = tcp_transport .upgrade(upgrade::Version::V1Lazy) .authenticate( noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."), ) - .multiplex(libp2p::yamux::Config::default()) + .multiplex(libp2p::yamux::Config::default()); + + let quic_transport = quic::async_std::Transport::new(quic::Config::new(&local_key)); + + let transport = quic_transport + .or_transport(tcp_transport) + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) .boxed(); let behaviour = Behaviour { @@ -70,13 +81,22 @@ fn main() -> Result<(), Box> { let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build(); // Listen on all interfaces - let listen_addr = Multiaddr::empty() + let listen_addr_tcp = Multiaddr::empty() .with(match opt.use_ipv6 { Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED), _ => Protocol::from(Ipv4Addr::UNSPECIFIED), }) .with(Protocol::Tcp(opt.port)); - swarm.listen_on(listen_addr)?; + swarm.listen_on(listen_addr_tcp)?; + + let listen_addr_quic = Multiaddr::empty() + .with(match opt.use_ipv6 { + Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED), + _ => Protocol::from(Ipv4Addr::UNSPECIFIED), + }) + .with(Protocol::Udp(opt.port)) + .with(Protocol::QuicV1); + swarm.listen_on(listen_addr_quic)?; block_on(async { loop { diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index db50b7b8a31..7002a9c2f73 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -3,7 +3,10 @@ - Raise MSRV to 1.65. See [PR 3715]. +- Add hole punching support by implementing `Transport::dial_as_listener`. See [PR 3964]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3964]: https://github.com/libp2p/rust-libp2p/pull/3964 ## 0.7.0-alpha.3 diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 79bd83f59f7..2005208c734 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -23,7 +23,7 @@ quinn-proto = { version = "0.10.1", default-features = false, features = ["tls-r rand = "0.8.5" rustls = { version = "0.21.1", default-features = false } thiserror = "1.0.40" -tokio = { version = "1.28.2", default-features = false, features = ["net", "rt"], optional = true } +tokio = { version = "1.28.2", default-features = false, features = ["net", "rt", "time"], optional = true } [features] tokio = ["dep:tokio", "if-watch/tokio"] diff --git a/transports/quic/src/endpoint.rs b/transports/quic/src/endpoint.rs index cef062a0d7e..bf69df50b62 100644 --- a/transports/quic/src/endpoint.rs +++ b/transports/quic/src/endpoint.rs @@ -279,6 +279,13 @@ impl Channel { Ok(Ok(())) } + pub(crate) async fn send(&mut self, to_endpoint: ToEndpoint) -> Result<(), Disconnected> { + self.to_endpoint + .send(to_endpoint) + .await + .map_err(|_| Disconnected {}) + } + /// Send a message to inform the [`Driver`] about an /// event caused by the owner of this [`Channel`] dropping. /// This clones the sender to the endpoint to guarantee delivery. diff --git a/transports/quic/src/hole_punching.rs b/transports/quic/src/hole_punching.rs new file mode 100644 index 00000000000..b9589dd17a0 --- /dev/null +++ b/transports/quic/src/hole_punching.rs @@ -0,0 +1,47 @@ +use std::{net::SocketAddr, time::Duration}; + +use futures::future::Either; +use rand::{distributions, Rng}; + +use crate::{ + endpoint::{self, ToEndpoint}, + Error, Provider, +}; + +pub(crate) async fn hole_puncher( + endpoint_channel: endpoint::Channel, + remote_addr: SocketAddr, + timeout_duration: Duration, +) -> Error { + let punch_holes_future = punch_holes::

(endpoint_channel, remote_addr); + futures::pin_mut!(punch_holes_future); + match futures::future::select(P::sleep(timeout_duration), punch_holes_future).await { + Either::Left(_) => Error::HandshakeTimedOut, + Either::Right((hole_punch_err, _)) => hole_punch_err, + } +} + +async fn punch_holes( + mut endpoint_channel: endpoint::Channel, + remote_addr: SocketAddr, +) -> Error { + loop { + let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200)); + P::sleep(sleep_duration).await; + + let random_udp_packet = ToEndpoint::SendUdpPacket(quinn_proto::Transmit { + destination: remote_addr, + ecn: None, + contents: rand::thread_rng() + .sample_iter(distributions::Standard) + .take(64) + .collect(), + segment_size: None, + src_ip: None, + }); + + if endpoint_channel.send(random_udp_packet).await.is_err() { + return Error::EndpointDriverCrashed; + } + } +} diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 594ba0b6108..945f5119c6e 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -59,9 +59,12 @@ mod connection; mod endpoint; +mod hole_punching; mod provider; mod transport; +use std::net::SocketAddr; + pub use connection::{Connecting, Connection, Substream}; pub use endpoint::Config; #[cfg(feature = "async-std")] @@ -94,6 +97,14 @@ pub enum Error { /// The [`Connecting`] future timed out. #[error("Handshake with the remote timed out.")] HandshakeTimedOut, + + /// Error when `Transport::dial_as_listener` is called without an active listener. + #[error("Tried to dial as listener without an active listener.")] + NoActiveListenerForDialAsListener, + + /// Error when holepunching for a remote is already in progress + #[error("Already punching hole for {0}).")] + HolePunchInProgress(SocketAddr), } /// Dialing a remote peer failed. diff --git a/transports/quic/src/provider.rs b/transports/quic/src/provider.rs index c38f77fd1b9..c9401e9b99f 100644 --- a/transports/quic/src/provider.rs +++ b/transports/quic/src/provider.rs @@ -18,12 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::Future; +use futures::{future::BoxFuture, Future}; use if_watch::IfEvent; use std::{ io, net::SocketAddr, task::{Context, Poll}, + time::Duration, }; #[cfg(feature = "async-std")] @@ -74,4 +75,7 @@ pub trait Provider: Unpin + Send + Sized + 'static { watcher: &mut Self::IfWatcher, cx: &mut Context<'_>, ) -> Poll>; + + /// Sleep for specified amount of time. + fn sleep(duration: Duration) -> BoxFuture<'static, ()>; } diff --git a/transports/quic/src/provider/async_std.rs b/transports/quic/src/provider/async_std.rs index 222c8e55e90..e593b2ed4f4 100644 --- a/transports/quic/src/provider/async_std.rs +++ b/transports/quic/src/provider/async_std.rs @@ -26,6 +26,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, }; use crate::GenTransport; @@ -104,6 +105,10 @@ impl super::Provider for Provider { ) -> Poll> { watcher.poll_if_event(cx) } + + fn sleep(duration: Duration) -> BoxFuture<'static, ()> { + async_std::task::sleep(duration).boxed() + } } type ReceiveStreamItem = ( diff --git a/transports/quic/src/provider/tokio.rs b/transports/quic/src/provider/tokio.rs index 07e23f8813c..77c9060e3c1 100644 --- a/transports/quic/src/provider/tokio.rs +++ b/transports/quic/src/provider/tokio.rs @@ -18,11 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{ready, Future}; +use futures::{future::BoxFuture, ready, Future, FutureExt}; use std::{ io, net::SocketAddr, task::{Context, Poll}, + time::Duration, }; use tokio::{io::ReadBuf, net::UdpSocket}; @@ -95,4 +96,8 @@ impl super::Provider for Provider { ) -> Poll> { watcher.poll_if_event(cx) } + + fn sleep(duration: Duration) -> BoxFuture<'static, ()> { + tokio::time::sleep(duration).boxed() + } } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 668034ed147..afdaf86cdf4 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -19,11 +19,12 @@ // DEALINGS IN THE SOFTWARE. use crate::endpoint::{Config, QuinnConfig, ToEndpoint}; +use crate::hole_punching::hole_puncher; use crate::provider::Provider; use crate::{endpoint, Connecting, Connection, Error}; use futures::channel::{mpsc, oneshot}; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, Either}; use futures::ready; use futures::stream::StreamExt; use futures::{prelude::*, stream::SelectAll}; @@ -73,6 +74,8 @@ pub struct GenTransport { dialer: HashMap, /// Waker to poll the transport again when a new dialer or listener is added. waker: Option, + /// Holepunching attempts + hole_punch_attempts: HashMap>, } impl GenTransport

{ @@ -88,6 +91,49 @@ impl GenTransport

{ dialer: HashMap::new(), waker: None, support_draft_29, + hole_punch_attempts: Default::default(), + } + } + + fn remote_multiaddr_to_socketaddr( + &self, + addr: Multiaddr, + ) -> Result< + (SocketAddr, ProtocolVersion, Option), + TransportError<::Error>, + > { + let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + Ok((socket_addr, version, peer_id)) + } + + fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener

> { + let mut listeners: Vec<_> = self + .listeners + .iter_mut() + .filter(|l| { + if l.is_closed { + return false; + } + let listen_addr = l.endpoint_channel.socket_addr(); + SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip()) + && listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback() + }) + .collect(); + match listeners.len() { + 0 => None, + 1 => listeners.pop(), + _ => { + // Pick any listener to use for dialing. + // We hash the socket address to achieve determinism. + let mut hasher = DefaultHasher::new(); + socket_addr.hash(&mut hasher); + let index = hasher.finish() as usize % listeners.len(); + Some(listeners.swap_remove(index)) + } } } } @@ -103,8 +149,9 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) - .ok_or(TransportError::MultiaddrNotSupported(addr))?; + let (socket_addr, version, _peer_id) = + multiaddr_to_socketaddr(&addr, self.support_draft_29) + .ok_or(TransportError::MultiaddrNotSupported(addr))?; let listener = Listener::new( listener_id, socket_addr, @@ -147,27 +194,12 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) - .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - return Err(TransportError::MultiaddrNotSupported(addr)); - } + let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr)?; - let mut listeners = self - .listeners - .iter_mut() - .filter(|l| { - if l.is_closed { - return false; - } - let listen_addr = l.endpoint_channel.socket_addr(); - SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip()) - && listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback() - }) - .collect::>(); + let handshake_timeout = self.handshake_timeout; - let dialer_state = match listeners.len() { - 0 => { + let dialer_state = match self.eligible_listener(&socket_addr) { + None => { // No listener. Get or create an explicit dialer. let socket_family = socket_addr.ip().into(); let dialer = match self.dialer.entry(socket_family) { @@ -181,28 +213,61 @@ impl Transport for GenTransport

{ }; &mut dialer.state } - 1 => &mut listeners[0].dialer_state, - _ => { - // Pick any listener to use for dialing. - // We hash the socket address to achieve determinism. - let mut hasher = DefaultHasher::new(); - socket_addr.hash(&mut hasher); - let index = hasher.finish() as usize % listeners.len(); - &mut listeners[index].dialer_state - } + Some(listener) => &mut listener.dialer_state, }; - Ok(dialer_state.new_dial(socket_addr, self.handshake_timeout, version)) + Ok(dialer_state.new_dial(socket_addr, handshake_timeout, version)) } fn dial_as_listener( &mut self, addr: Multiaddr, ) -> Result> { - // TODO: As the listener of a QUIC hole punch, we need to send a random UDP packet to the - // `addr`. See DCUtR specification below. - // - // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol - Err(TransportError::MultiaddrNotSupported(addr)) + let (socket_addr, _version, peer_id) = self.remote_multiaddr_to_socketaddr(addr.clone())?; + let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr))?; + + let endpoint_channel = self + .eligible_listener(&socket_addr) + .ok_or(TransportError::Other( + Error::NoActiveListenerForDialAsListener, + ))? + .endpoint_channel + .clone(); + + let hole_puncher = hole_puncher::

(endpoint_channel, socket_addr, self.handshake_timeout); + + let (sender, receiver) = oneshot::channel(); + + match self.hole_punch_attempts.entry(socket_addr) { + Entry::Occupied(mut sender_entry) => { + // Stale senders, i.e. from failed hole punches are not removed. + // Thus, we can just overwrite a stale sender. + if !sender_entry.get().is_canceled() { + return Err(TransportError::Other(Error::HolePunchInProgress( + socket_addr, + ))); + } + sender_entry.insert(sender); + } + Entry::Vacant(entry) => { + entry.insert(sender); + } + }; + + Ok(Box::pin(async move { + futures::pin_mut!(hole_puncher); + match futures::future::select(receiver, hole_puncher).await { + Either::Left((message, _)) => { + let (inbound_peer_id, connection) = message + .expect("hole punch connection sender is never dropped before receiver") + .await?; + if inbound_peer_id != peer_id { + log::warn!("expected inbound connection from {socket_addr} to resolve to {peer_id} but got {inbound_peer_id}"); + } + Ok((inbound_peer_id, connection)) + } + Either::Right((hole_punch_err, _)) => Err(hole_punch_err), + } + })) } fn poll( @@ -222,8 +287,37 @@ impl Transport for GenTransport

{ self.dialer.remove(&key); } - if let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) { - return Poll::Ready(ev); + while let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) { + match ev { + TransportEvent::Incoming { + listener_id, + mut upgrade, + local_addr, + send_back_addr, + } => { + let socket_addr = + multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29) + .unwrap() + .0; + + if let Some(sender) = self.hole_punch_attempts.remove(&socket_addr) { + match sender.send(upgrade) { + Ok(()) => continue, + Err(timed_out_holepunch) => { + upgrade = timed_out_holepunch; + } + } + } + + return Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade, + local_addr, + send_back_addr, + }); + } + _ => return Poll::Ready(ev), + } } self.waker = Some(cx.waker().clone()); @@ -594,15 +688,18 @@ fn ip_to_listenaddr( fn multiaddr_to_socketaddr( addr: &Multiaddr, support_draft_29: bool, -) -> Option<(SocketAddr, ProtocolVersion)> { +) -> Option<(SocketAddr, ProtocolVersion, Option)> { let mut iter = addr.iter(); let proto1 = iter.next()?; let proto2 = iter.next()?; let proto3 = iter.next()?; + let mut peer_id = None; for proto in iter { match proto { - Protocol::P2p(_) => {} // Ignore a `/p2p/...` prefix of possibly outer protocols, if present. + Protocol::P2p(id) => { + peer_id = Some(id); + } _ => return None, } } @@ -614,10 +711,10 @@ fn multiaddr_to_socketaddr( match (proto1, proto2) { (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version)) + Some((SocketAddr::new(ip.into(), port), version, peer_id)) } (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version)) + Some((SocketAddr::new(ip.into(), port), version, peer_id)) } _ => None, } @@ -691,7 +788,8 @@ mod test { ), Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,), - ProtocolVersion::V1 + ProtocolVersion::V1, + None )) ); assert_eq!( @@ -703,7 +801,8 @@ mod test { ), Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,), - ProtocolVersion::V1 + ProtocolVersion::V1, + None )) ); assert_eq!( @@ -715,7 +814,7 @@ mod test { Some((SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55148, - ), ProtocolVersion::V1)) + ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap()))) ); assert_eq!( multiaddr_to_socketaddr( @@ -724,7 +823,8 @@ mod test { ), Some(( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,), - ProtocolVersion::V1 + ProtocolVersion::V1, + None )) ); assert_eq!( @@ -741,7 +841,8 @@ mod test { )), 8080, ), - ProtocolVersion::V1 + ProtocolVersion::V1, + None )) ); @@ -757,7 +858,8 @@ mod test { ), Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,), - ProtocolVersion::Draft29 + ProtocolVersion::Draft29, + None )) ); }