Skip to content

Commit

Permalink
feat(get-ticket): Contact provider on all listening addrs (#893)
Browse files Browse the repository at this point in the history
This tries to contact the provider on all addresses inside the ticket
when using the get-ticket command.  The first connection will be used.
  • Loading branch information
flub committed Mar 29, 2023
1 parent 91c7e2a commit adbb2bf
Showing 1 changed file with 50 additions and 27 deletions.
77 changes: 50 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const DEFAULT_RPC_PORT: u16 = 0x1337;
const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";
const MAX_RPC_CONNECTIONS: u32 = 16;
const MAX_RPC_STREAMS: u64 = 1024;
const MAX_CONCURRENT_DIALS: u8 = 16;

#[derive(Parser, Debug, Clone)]
#[clap(version, about, long_about = None)]
Expand Down Expand Up @@ -477,38 +478,28 @@ async fn main_impl() -> Result<()> {
}
let token = AuthToken::from_str(&auth_token)
.context("Wrong format for authentication token")?;
let get = GetInteractive::Hash {
hash: *hash.as_hash(),
opts,
token,
};
tokio::select! {
biased;
res = get_interactive(*hash.as_hash(), opts, token, out) => {
res
}
res = get_interactive(get, out) => res,
_ = tokio::signal::ctrl_c() => {
println!("Ending transfer early...");
Ok(())
}
}
}
Commands::GetTicket { out, ticket } => {
let Ticket {
hash,
peer,
addrs,
token,
} = ticket;
let addr = addrs
.get(0)
.copied()
.context("missing SocketAddr in ticket")?;
let opts = get::Options {
addr,
peer_id: Some(peer),
let get = GetInteractive::Ticket {
ticket,
keylog: cli.keylog,
};
tokio::select! {
biased;
res = get_interactive(hash, opts, token, out) => {
res
}
res = get_interactive(get, out) => res,
_ = tokio::signal::ctrl_c() => {
println!("Ending transfer early...");
Ok(())
Expand Down Expand Up @@ -753,13 +744,30 @@ async fn get_keypair(key: Option<PathBuf>) -> Result<Keypair> {
}
}

async fn get_interactive(
hash: Hash,
opts: get::Options,
token: AuthToken,
out: Option<PathBuf>,
) -> Result<()> {
progress!("Fetching: {}", Blake3Cid::new(hash));
#[derive(Debug)]
enum GetInteractive {
Ticket {
ticket: Ticket,
keylog: bool,
},
Hash {
hash: Hash,
opts: get::Options,
token: AuthToken,
},
}

impl GetInteractive {
fn hash(&self) -> Hash {
match self {
GetInteractive::Ticket { ticket, .. } => ticket.hash,
GetInteractive::Hash { hash, .. } => *hash,
}
}
}

async fn get_interactive(get: GetInteractive, out: Option<PathBuf>) -> Result<()> {
progress!("Fetching: {}", Blake3Cid::new(get.hash()));

progress!("{} Connecting ...", style("[1/3]").bold().dim());

Expand Down Expand Up @@ -854,7 +862,22 @@ async fn get_interactive(
Ok(reader)
}
};
let stats = get::run(hash, token, opts, on_connected, on_collection, on_blob).await?;
let stats = match get {
GetInteractive::Ticket { ticket, keylog } => {
get::run_ticket(
&ticket,
keylog,
MAX_CONCURRENT_DIALS,
on_connected,
on_collection,
on_blob,
)
.await?
}
GetInteractive::Hash { hash, opts, token } => {
get::run(hash, token, opts, on_connected, on_collection, on_blob).await?
}
};

pb.finish_and_clear();
progress!(
Expand Down

0 comments on commit adbb2bf

Please sign in to comment.