Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quic): implement hole punching #3964

Merged
merged 44 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7a714bf
Implement Transport::dial_as_listener for QUIC
arpankapoor May 21, 2023
145bf11
Merge QUIC dcutr and relay examples
arpankapoor May 23, 2023
0eafaf4
Simplify nested if let
arpankapoor May 23, 2023
fd70f73
Return error when dial_as_listener is called without an active listener
arpankapoor May 23, 2023
ddacaa0
Extract peer_id in multiaddr_to_socketaddr
arpankapoor May 25, 2023
3325dbe
Add peerid to holepunching key
arpankapoor May 25, 2023
271f234
Update dcutr example
arpankapoor May 26, 2023
8b9d778
Check if holepunching is already in progress
arpankapoor May 26, 2023
2290811
Dedup dial and dial_as_listener
arpankapoor May 26, 2023
2621c13
Store random packet on failure
arpankapoor May 30, 2023
3b17e38
Use `Display` formatting instead of `Debug`
arpankapoor May 30, 2023
45446bb
Use slice pattern
arpankapoor May 30, 2023
8a344c7
Resolve merge conflict
arpankapoor May 31, 2023
81988c2
Resolve merge conflict
arpankapoor May 31, 2023
a831f51
Merge branch 'master' into master
arpankapoor Jun 1, 2023
28ece03
Use futures::future::select instead of select! macro
arpankapoor Jun 1, 2023
ce86b7c
Merge branch 'master' into master
arpankapoor Jun 5, 2023
794cabc
Implement HolePuncher as an async function
arpankapoor Jun 5, 2023
e501175
Remove entry from HolePunchMap on timeout
arpankapoor Jun 5, 2023
ad2b644
Run rustfmt
arpankapoor Jun 5, 2023
ba60bba
Implement MaybeHolePunchedConnection as an async function
arpankapoor Jun 5, 2023
6c378ef
Use `SendExt::send` instead of `poll_ready` and `start_send`
arpankapoor Jun 5, 2023
3490940
Swap position of hole_puncher and punch_holes
arpankapoor Jun 6, 2023
26cd328
Clean up maybe_hole_punched_connection
arpankapoor Jun 6, 2023
0b1eaa5
Rename hole_punch_map to hole_punch_attempts
arpankapoor Jun 6, 2023
fbbd1a1
Create newtype HolePunchMap
arpankapoor Jun 6, 2023
2ad9255
Don't mix private/public imports
arpankapoor Jun 6, 2023
bbbf7a1
Return single eligible listener
arpankapoor Jun 7, 2023
26da28d
Return different error on successful holepunch
arpankapoor Jun 7, 2023
8eda5a7
Update external address
arpankapoor Jun 8, 2023
6c2638a
Merge branch 'master' of https://github.com/libp2p/rust-libp2p
arpankapoor Jun 8, 2023
5f58955
Resolve issue after merge
arpankapoor Jun 8, 2023
928314f
Remove peer_id from hole punching key
arpankapoor Jun 10, 2023
6e1b02d
Merge branch 'master' of https://github.com/libp2p/rust-libp2p
arpankapoor Jun 10, 2023
4f4f528
Remove redudant match arm from relay example
arpankapoor Jun 10, 2023
3f6bb59
Update CHANGELOG
arpankapoor Jun 10, 2023
02b1268
Fix imports
arpankapoor Jun 10, 2023
35de850
Review fixes
arpankapoor Jun 10, 2023
e46c170
More review fixes
arpankapoor Jun 11, 2023
36655ee
Move timeout and sleep to Provider trait
arpankapoor Jun 11, 2023
49873f0
Implement timeout using sleep
arpankapoor Jun 11, 2023
0eb9246
Loop through listener events
arpankapoor Jun 11, 2023
05b7a9f
Remove unused TimeoutError
arpankapoor Jun 11, 2023
9d001c7
Merge branch 'master' into master
mergify[bot] Jun 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/dcutr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ env_logger = "0.10.0"
futures = "0.3.28"
futures-timer = "3.0"
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] }
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }
log = "0.4"
49 changes: 28 additions & 21 deletions examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
use clap::Parser;
use futures::{
executor::{block_on, ThreadPool},
future::FutureExt,
future::{Either, FutureExt},
stream::StreamExt,
};
use libp2p::{
core::{
multiaddr::{Multiaddr, Protocol},
transport::{OrTransport, Transport},
muxing::StreamMuxerBox,
transport::Transport,
upgrade,
},
dcutr,
Expand All @@ -38,9 +39,9 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId,
};
use libp2p_quic as quic;
use log::info;
use std::error::Error;
use std::net::Ipv4Addr;
use std::str::FromStr;

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -91,19 +92,26 @@ fn main() -> Result<(), Box<dyn Error>> {

let (relay_transport, client) = relay::client::new(local_peer_id);

let transport = OrTransport::new(
relay_transport,
block_on(DnsConfig::system(tcp::async_io::Transport::new(
tcp::Config::default().port_reuse(true),
)))
.unwrap(),
)
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(yamux::Config::default())
.boxed();
let transport = {
let relay_tcp_quic_transport = relay_transport
.or_transport(tcp::async_io::Transport::new(
tcp::Config::default().port_reuse(true),
))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&local_key).unwrap())
.multiplex(yamux::Config::default())
.or_transport(quic::async_std::Transport::new(quic::Config::new(
&local_key,
)));

block_on(DnsConfig::system(relay_tcp_quic_transport))
.unwrap()
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed()
};

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
Expand Down Expand Up @@ -164,11 +172,10 @@ fn main() -> Result<(), Box<dyn Error>> {
.build();

swarm
.listen_on(
Multiaddr::empty()
.with("0.0.0.0".parse::<Ipv4Addr>().unwrap().into())
.with(Protocol::Tcp(0)),
)
.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which means that there is no way to perform hole punching in the example because eligible_listener checks is_loopback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the example for testing this in two real networks. If you run this example on localhost, you are not really testing hole punching but just that the protocol is executed. On localhost, the connection should always succeed because there is no need for the random UDP packets to punch a hole. Am I wrong?

.unwrap();
swarm
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();

// Wait to listen on all interfaces.
Expand Down
1 change: 1 addition & 0 deletions examples/relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ async-trait = "0.1"
env_logger = "0.10.0"
futures = "0.3.28"
libp2p = { path = "../../libp2p", features = ["async-std", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay"] }
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }
30 changes: 25 additions & 5 deletions examples/relay-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
#![doc = include_str!("../README.md")]

use clap::Parser;
use futures::executor::block_on;
use futures::stream::StreamExt;
use futures::{executor::block_on, future::Either};
use libp2p::{
core::multiaddr::Protocol,
core::muxing::StreamMuxerBox,
core::upgrade,
core::{Multiaddr, Transport},
identify, identity,
Expand All @@ -34,6 +35,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
tcp,
};
use libp2p_quic as quic;
use std::error::Error;
use std::net::{Ipv4Addr, Ipv6Addr};

Expand All @@ -50,12 +52,21 @@ fn main() -> Result<(), Box<dyn Error>> {

let tcp_transport = tcp::async_io::Transport::default();

let transport = tcp_transport
let tcp_transport = tcp_transport
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.multiplex(libp2p::yamux::Config::default());

let quic_transport = quic::async_std::Transport::new(quic::Config::new(&local_key));

let transport = quic_transport
.or_transport(tcp_transport)
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();

let behaviour = Behaviour {
Expand All @@ -70,13 +81,22 @@ fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build();

// Listen on all interfaces
let listen_addr = Multiaddr::empty()
let listen_addr_tcp = Multiaddr::empty()
.with(match opt.use_ipv6 {
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
_ => Protocol::from(Ipv4Addr::UNSPECIFIED),
})
.with(Protocol::Tcp(opt.port));
swarm.listen_on(listen_addr)?;
swarm.listen_on(listen_addr_tcp)?;

let listen_addr_quic = Multiaddr::empty()
.with(match opt.use_ipv6 {
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
_ => Protocol::from(Ipv4Addr::UNSPECIFIED),
})
.with(Protocol::Udp(opt.port))
.with(Protocol::QuicV1);
swarm.listen_on(listen_addr_quic)?;

block_on(async {
loop {
Expand Down
3 changes: 3 additions & 0 deletions transports/quic/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Add hole punching support by implementing `Transport::dial_as_listener`. See [PR 3964].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3964]: https://github.com/libp2p/rust-libp2p/pull/3964

## 0.7.0-alpha.3

Expand Down
2 changes: 1 addition & 1 deletion transports/quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ quinn-proto = { version = "0.10.1", default-features = false, features = ["tls-r
rand = "0.8.5"
rustls = { version = "0.21.1", default-features = false }
thiserror = "1.0.40"
tokio = { version = "1.28.2", default-features = false, features = ["net", "rt"], optional = true }
tokio = { version = "1.28.2", default-features = false, features = ["net", "rt", "time"], optional = true }

[features]
tokio = ["dep:tokio", "if-watch/tokio"]
Expand Down
7 changes: 7 additions & 0 deletions transports/quic/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ impl Channel {
Ok(Ok(()))
}

pub(crate) async fn send(&mut self, to_endpoint: ToEndpoint) -> Result<(), Disconnected> {
self.to_endpoint
.send(to_endpoint)
.await
.map_err(|_| Disconnected {})
}

/// Send a message to inform the [`Driver`] about an
/// event caused by the owner of this [`Channel`] dropping.
/// This clones the sender to the endpoint to guarantee delivery.
Expand Down
46 changes: 46 additions & 0 deletions transports/quic/src/hole_punching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{net::SocketAddr, time::Duration};

use rand::{distributions, Rng};

use crate::{
endpoint::{self, ToEndpoint},
Error, Provider,
};

pub(crate) async fn hole_puncher<P: Provider>(
endpoint_channel: endpoint::Channel,
remote_addr: SocketAddr,
timeout_duration: Duration,
) -> Error {
P::timeout(
timeout_duration,
punch_holes::<P>(endpoint_channel, remote_addr),
)
.await
.unwrap_or(Error::HandshakeTimedOut)
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}

async fn punch_holes<P: Provider>(
mut endpoint_channel: endpoint::Channel,
remote_addr: SocketAddr,
) -> Error {
loop {
let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200));
P::sleep(sleep_duration).await;

let random_udp_packet = ToEndpoint::SendUdpPacket(quinn_proto::Transmit {
destination: remote_addr,
ecn: None,
contents: rand::thread_rng()
.sample_iter(distributions::Standard)
.take(64)
.collect(),
segment_size: None,
src_ip: None,
});

if endpoint_channel.send(random_udp_packet).await.is_err() {
return Error::EndpointDriverCrashed;
}
}
}
11 changes: 11 additions & 0 deletions transports/quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@

mod connection;
mod endpoint;
mod hole_punching;
mod provider;
mod transport;

use std::net::SocketAddr;

pub use connection::{Connecting, Connection, Substream};
pub use endpoint::Config;
#[cfg(feature = "async-std")]
Expand Down Expand Up @@ -94,6 +97,14 @@ pub enum Error {
/// The [`Connecting`] future timed out.
#[error("Handshake with the remote timed out.")]
HandshakeTimedOut,

/// Error when `Transport::dial_as_listener` is called without an active listener.
#[error("Tried to dial as listener without an active listener.")]
NoActiveListenerForDialAsListener,

/// Error when holepunching for a remote is already in progress
#[error("Already punching hole for {0}).")]
HolePunchInProgress(SocketAddr),
}

/// Dialing a remote peer failed.
Expand Down
15 changes: 14 additions & 1 deletion transports/quic/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::Future;
use futures::{future::BoxFuture, Future};
use if_watch::IfEvent;
use std::{
io,
net::SocketAddr,
task::{Context, Poll},
time::Duration,
};

#[cfg(feature = "async-std")]
Expand All @@ -39,6 +40,7 @@ const RECEIVE_BUFFER_SIZE: usize = 65536;
/// and spawning tasks.
pub trait Provider: Unpin + Send + Sized + 'static {
type IfWatcher: Unpin + Send;
type TimeoutError;

/// Create a new providing that is wrapping the socket.
///
Expand Down Expand Up @@ -74,4 +76,15 @@ pub trait Provider: Unpin + Send + Sized + 'static {
watcher: &mut Self::IfWatcher,
cx: &mut Context<'_>,
) -> Poll<io::Result<IfEvent>>;

/// Awaits a future or times out after a duration of time.
fn timeout<F>(
duration: Duration,
future: F,
) -> BoxFuture<'static, Result<F::Output, Self::TimeoutError>>
where
F: Future + Send + 'static;

/// Sleep for specified amount of time.
fn sleep(duration: Duration) -> BoxFuture<'static, ()>;
}
16 changes: 16 additions & 0 deletions transports/quic/src/provider/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use crate::GenTransport;
Expand All @@ -46,6 +47,7 @@ pub struct Provider {

impl super::Provider for Provider {
type IfWatcher = if_watch::smol::IfWatcher;
type TimeoutError = async_std::future::TimeoutError;

fn from_socket(socket: std::net::UdpSocket) -> io::Result<Self> {
let socket = Arc::new(socket.into());
Expand Down Expand Up @@ -104,6 +106,20 @@ impl super::Provider for Provider {
) -> Poll<io::Result<if_watch::IfEvent>> {
watcher.poll_if_event(cx)
}

fn timeout<F>(
duration: Duration,
future: F,
) -> BoxFuture<'static, Result<F::Output, Self::TimeoutError>>
where
F: Future + Send + 'static,
{
async_std::future::timeout(duration, future).boxed()
}

fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
async_std::task::sleep(duration).boxed()
}
}

type ReceiveStreamItem = (
Expand Down