Skip to content

Commit

Permalink
feat: Add run_ticket to dial all addresses stored in a Ticket (#888)
Browse files Browse the repository at this point in the history
This adds a new iroh::get::run_ticket call which will dial all the
addresses in a ticket concurrently, up to a specified concurrency
limit.  The first established connection is returned.

This allows much better fetching of a ticket, without having to know
how it is constructed and speeds up connecting.

When dialing an attempt is made to first try addresses which are
considered to be on the same subnet as any local address.
  • Loading branch information
flub committed Mar 28, 2023
1 parent 80ee3db commit 91c7e2a
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 123 additions & 6 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ use crate::blobs::Collection;
use crate::protocol::{
read_bao_encoded, read_lp, write_lp, AuthToken, Handshake, Request, Res, Response,
};
use crate::provider::Ticket;
use crate::subnet::{same_subnet_v4, same_subnet_v6};
use crate::tls::{self, Keypair, PeerId};
use abao::decode::AsyncSliceDecoder;
use anyhow::{anyhow, bail, Context, Result};
use bytes::BytesMut;
use futures::Future;
use default_net::Interface;
use futures::{Future, StreamExt};
use postcard::experimental::max_size::MaxSize;
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tracing::{debug, error};
Expand Down Expand Up @@ -65,8 +68,8 @@ pub fn make_client_endpoint(
Ok(endpoint)
}

/// Setup a QUIC connection to the provided address.
async fn setup(opts: Options) -> Result<quinn::Connection> {
/// Establishes a QUIC connection to the provided peer.
async fn dial_peer(opts: Options) -> Result<quinn::Connection> {
let bind_addr = match opts.addr.is_ipv6() {
true => SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0).into(),
false => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(),
Expand Down Expand Up @@ -129,14 +132,99 @@ impl AsyncRead for DataStream {
}
}

/// Gets a collection and all its blobs using a [`Ticket`].
pub async fn run_ticket<A, B, C, FutA, FutB, FutC>(
ticket: &Ticket,
keylog: bool,
max_concurrent: u8,
on_connected: A,
on_collection: B,
on_blob: C,
) -> Result<Stats>
where
A: FnOnce() -> FutA,
FutA: Future<Output = Result<()>>,
B: FnOnce(&Collection) -> FutB,
FutB: Future<Output = Result<()>>,
C: FnMut(Hash, DataStream, String) -> FutC,
FutC: Future<Output = Result<DataStream>>,
{
let start = Instant::now();
let connection = dial_ticket(ticket, keylog, max_concurrent.into()).await?;
run_connection(
connection,
ticket.hash,
ticket.token,
start,
on_connected,
on_collection,
on_blob,
)
.await
}

async fn dial_ticket(
ticket: &Ticket,
keylog: bool,
max_concurrent: usize,
) -> Result<quinn::Connection> {
// Sort the interfaces to make sure local ones are at the front of the list.
let interfaces = default_net::get_interfaces();
let (mut addrs, other_addrs) = ticket
.addrs
.iter()
.partition::<Vec<_>, _>(|addr| is_same_subnet(addr, &interfaces));
addrs.extend(other_addrs);

let mut conn_stream = futures::stream::iter(addrs)
.map(|addr| {
let opts = Options {
addr,
peer_id: Some(ticket.peer),
keylog,
};
dial_peer(opts)
})
.buffer_unordered(max_concurrent);
while let Some(res) = conn_stream.next().await {
match res {
Ok(conn) => return Ok(conn),
Err(_) => continue,
}
}
Err(anyhow!("Failed to establish connection to peer"))
}

fn is_same_subnet(addr: &SocketAddr, interfaces: &[Interface]) -> bool {
for interface in interfaces {
match addr {
SocketAddr::V4(peer_addr) => {
for net in interface.ipv4.iter() {
if same_subnet_v4(net.addr, *peer_addr.ip(), net.prefix_len) {
return true;
}
}
}
SocketAddr::V6(peer_addr) => {
for net in interface.ipv6.iter() {
if same_subnet_v6(net.addr, *peer_addr.ip(), net.prefix_len) {
return true;
}
}
}
}
}
false
}

/// Get a collection and all its blobs from a provider
pub async fn run<A, B, C, FutA, FutB, FutC>(
hash: Hash,
auth_token: AuthToken,
opts: Options,
on_connected: A,
on_collection: B,
mut on_blob: C,
on_blob: C,
) -> Result<Stats>
where
A: FnOnce() -> FutA,
Expand All @@ -147,8 +235,37 @@ where
FutC: Future<Output = Result<DataStream>>,
{
let now = Instant::now();
let connection = setup(opts).await?;
let connection = dial_peer(opts).await?;
run_connection(
connection,
hash,
auth_token,
now,
on_connected,
on_collection,
on_blob,
)
.await
}

/// Gets a collection and all its blobs from a provider on the established connection.
async fn run_connection<A, B, C, FutA, FutB, FutC>(
connection: quinn::Connection,
hash: Hash,
auth_token: AuthToken,
start_time: Instant,
on_connected: A,
on_collection: B,
mut on_blob: C,
) -> Result<Stats>
where
A: FnOnce() -> FutA,
FutA: Future<Output = Result<()>>,
B: FnOnce(&Collection) -> FutB,
FutB: Future<Output = Result<()>>,
C: FnMut(Hash, DataStream, String) -> FutC,
FutC: Future<Output = Result<DataStream>>,
{
let (mut writer, mut reader) = connection.open_bi().await?;

on_connected().await?;
Expand Down Expand Up @@ -242,7 +359,7 @@ where
}
drop(reader);

let elapsed = now.elapsed();
let elapsed = start_time.elapsed();

let stats = Stats { data_len, elapsed };

Expand Down
50 changes: 47 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
#![deny(rustdoc::broken_intra_doc_links)]
pub mod blobs;
pub mod get;
pub mod net;
pub mod progress;
pub mod protocol;
pub mod provider;
pub mod rpc_protocol;

pub mod net;

mod subnet;
mod tls;
mod util;

Expand All @@ -19,7 +19,7 @@ pub use util::Hash;
#[cfg(test)]
mod tests {
use std::{
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{atomic::AtomicUsize, Arc},
time::Duration,
Expand Down Expand Up @@ -490,4 +490,48 @@ mod tests {
.expect("timeout")
.expect("get failed");
}

#[tokio::test]
async fn test_run_ticket() {
let readme = Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md");
let (db, hash) = create_collection(vec![readme.into()]).await.unwrap();
let provider = Provider::builder(db)
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
.spawn()
.unwrap();
let _drop_guard = provider.cancel_token().drop_guard();
let ticket = provider.ticket(hash);
let mut on_connected = false;
let mut on_collection = false;
let mut on_blob = false;
tokio::time::timeout(
Duration::from_secs(10),
get::run_ticket(
&ticket,
true,
16,
|| {
on_connected = true;
async { Ok(()) }
},
|_| {
on_collection = true;
async { Ok(()) }
},
|_hash, mut stream, _name| {
on_blob = true;
async move {
io::copy(&mut stream, &mut io::sink()).await?;
Ok(stream)
}
},
),
)
.await
.expect("timeout")
.expect("get ticket failed");
assert!(on_connected);
assert!(on_collection);
assert!(on_blob);
}
}
41 changes: 41 additions & 0 deletions src/subnet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! Same subnet logic.
//!
//! Tiny module because left/right shifting confuses emacs' rust-mode. So sad.

use std::net::{Ipv4Addr, Ipv6Addr};

/// Checks if both addresses are on the same subnet given the `prefix_len`.
pub(crate) fn same_subnet_v4(addr_a: Ipv4Addr, addr_b: Ipv4Addr, prefix_len: u8) -> bool {
let mask = u32::MAX << (32 - prefix_len);
let a = u32::from(addr_a) & mask;
let b = u32::from(addr_b) & mask;
a == b
}

pub(crate) fn same_subnet_v6(addr_a: Ipv6Addr, addr_b: Ipv6Addr, prefix_len: u8) -> bool {
let mask = u128::MAX << (128 - prefix_len);
let a = u128::from(addr_a) & mask;
let b = u128::from(addr_b) & mask;
a == b
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_same_subnet_v4() {
let a = Ipv4Addr::new(192, 168, 0, 5);
let b = Ipv4Addr::new(192, 168, 1, 6);
assert!(!same_subnet_v4(a, b, 24));
assert!(same_subnet_v4(a, b, 16));
}

#[test]
fn test_same_subnet_v6() {
let a = Ipv6Addr::new(0xfd56, 0x5799, 0xd8f6, 0x3cc, 0x0, 0x0, 0x0, 0x1);
let b = Ipv6Addr::new(0xfd56, 0x5799, 0xd8f6, 0x3cd, 0x0, 0x0, 0x0, 0x2);
assert!(!same_subnet_v6(a, b, 64));
assert!(same_subnet_v6(a, b, 48));
}
}

0 comments on commit 91c7e2a

Please sign in to comment.