Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
766 changes: 401 additions & 365 deletions content-discovery/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ missing_debug_implementations = "warn"
unused-async = "warn"

[workspace.dependencies]
iroh = { version ="0.35", features = ["discovery-pkarr-dht"] }
iroh-base = "0.35"
iroh-blobs = { version = "0.35", features = ["rpc"] }
iroh = { version ="0.90", features = ["discovery-pkarr-dht"] }
iroh-base = "0.90"
iroh-blobs = { version = "0.90" }
# explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255
tokio = { version = "1.44.1" }
tokio-stream = { version = "0.1.17" }
Expand Down
10 changes: 5 additions & 5 deletions content-discovery/iroh-content-discovery-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::str::FromStr;

use anyhow::bail;
use clap::Parser;
use iroh::endpoint;
use iroh::endpoint::{self, BindError};
use iroh_content_discovery::protocol::{
AbsoluteTime, Announce, AnnounceKind, Query, QueryFlags, SignedAnnounce,
};
Expand Down Expand Up @@ -43,7 +43,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
let signed_announce = SignedAnnounce::new(announce, &key)?;
if !args.tracker.is_empty() {
for tracker in args.tracker {
println!("announcing to {}: {}", tracker, content);
println!("announcing to {tracker}: {content}");
iroh_content_discovery::announce(&endpoint, tracker, signed_announce).await?;
}
}
Expand All @@ -66,7 +66,7 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
match iroh_content_discovery::query(&ep, tracker, query).await {
Ok(announces) => announces,
Err(e) => {
eprintln!("failed to query tracker {}: {}", tracker, e);
eprintln!("failed to query tracker {tracker}: {e}");
continue;
}
};
Expand All @@ -83,13 +83,13 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {

/// Create an endpoint that does look up discovery info via DNS or the DHT, but does not
/// announce. The client node id is ephemeral and will not be dialed by anyone.
async fn create_client_endpoint() -> anyhow::Result<endpoint::Endpoint> {
async fn create_client_endpoint() -> Result<endpoint::Endpoint, BindError> {
let discovery = iroh::discovery::pkarr::dht::DhtDiscovery::builder()
.dht(true)
.n0_dns_pkarr_relay()
.build()?;
endpoint::Endpoint::builder()
.discovery(Box::new(discovery))
.discovery(discovery)
.bind()
.await
}
Expand Down
4 changes: 2 additions & 2 deletions content-discovery/iroh-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::protocol::{
pub enum Error {
#[snafu(display("Failed to connect to tracker: {}", source))]
Connect {
source: anyhow::Error,
source: iroh::endpoint::ConnectWithOptsError,
backtrace: snafu::Backtrace,
},

Expand Down Expand Up @@ -64,7 +64,7 @@ pub enum Error {

#[snafu(display("Failed to get remote node id: {}", source))]
RemoteNodeId {
source: anyhow::Error,
source: iroh::endpoint::RemoteNodeIdError,
backtrace: snafu::Backtrace,
},
}
Expand Down
1 change: 1 addition & 0 deletions content-discovery/iroh-content-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ iroh-content-discovery = { path = "../iroh-content-discovery", features = ["clie

clap = { version = "4", features = ["derive"], optional = true }
serde-big-array = "0.5.1"
ssh-key = { version = "0.6", features = ["ed25519"] }

[features]
cli = ["clap"]
Expand Down
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-tracker/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub fn log_connection_attempt(
path: &Option<PathBuf>,
host: &NodeId,
t0: Instant,
outcome: &anyhow::Result<iroh::endpoint::Connection>,
outcome: &Result<iroh::endpoint::Connection, iroh::endpoint::ConnectError>,
) -> anyhow::Result<()> {
if let Some(path) = path {
let now = SystemTime::now()
Expand Down
62 changes: 46 additions & 16 deletions content-discovery/iroh-content-tracker/src/iroh_blobs_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use bao_tree::{ChunkNum, ChunkRanges};
use bytes::Bytes;
use iroh_blobs::{
get::{
fsm::{BlobContentNext, EndBlobNext},
fsm::{BlobContentNext, EndBlobNext, RequestCounters},
Stats,
},
hashseq::HashSeq,
protocol::{GetRequest, RangeSpecSeq},
protocol::{ChunkRangesSeq, GetRequest},
Hash, HashAndFormat,
};
use rand::Rng;
Expand All @@ -22,11 +22,17 @@ pub async fn unverified_size(
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
let request = iroh_blobs::protocol::GetRequest::new(
*hash,
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
let request = iroh_blobs::protocol::GetRequest::new(*hash, ChunkRangesSeq::all());
let request = iroh_blobs::get::fsm::start(
connection.clone(),
request,
RequestCounters {
payload_bytes_written: 0,
other_bytes_written: 0,
payload_bytes_read: 0,
other_bytes_read: 0,
},
);
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
let connected = request.next().await?;
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
unreachable!("expected start root");
Expand All @@ -46,11 +52,17 @@ pub async fn verified_size(
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
tracing::debug!("Getting verified size of {}", hash.to_hex());
let request = iroh_blobs::protocol::GetRequest::new(
*hash,
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
let request = iroh_blobs::protocol::GetRequest::new(*hash, ChunkRangesSeq::verified_size());
let request = iroh_blobs::get::fsm::start(
connection.clone(),
request,
RequestCounters {
payload_bytes_written: 0,
other_bytes_written: 0,
payload_bytes_read: 0,
other_bytes_read: 0,
},
);
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
let connected = request.next().await?;
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
unreachable!("expected start root");
Expand Down Expand Up @@ -89,12 +101,21 @@ pub async fn get_hash_seq_and_sizes(
tracing::debug!("Getting hash seq and children sizes of {}", content);
let request = iroh_blobs::protocol::GetRequest::new(
*hash,
RangeSpecSeq::from_ranges_infinite([
ChunkRangesSeq::from_ranges_infinite([
ChunkRanges::all(),
ChunkRanges::from(ChunkNum(u64::MAX)..),
]),
);
let at_start = iroh_blobs::get::fsm::start(connection.clone(), request);
let at_start = iroh_blobs::get::fsm::start(
connection.clone(),
request,
RequestCounters {
payload_bytes_written: 0,
other_bytes_written: 0,
payload_bytes_read: 0,
other_bytes_read: 0,
},
);
let at_connected = at_start.next().await?;
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
unreachable!("query includes root");
Expand Down Expand Up @@ -140,9 +161,18 @@ pub async fn chunk_probe(
chunk: ChunkNum,
) -> anyhow::Result<Stats> {
let ranges = ChunkRanges::from(chunk..chunk + 1);
let ranges = RangeSpecSeq::from_ranges([ranges]);
let ranges = ChunkRangesSeq::from_ranges([ranges]);
let request = GetRequest::new(*hash, ranges);
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
let request = iroh_blobs::get::fsm::start(
connection.clone(),
request,
RequestCounters {
payload_bytes_written: 0,
other_bytes_written: 0,
payload_bytes_read: 0,
other_bytes_read: 0,
},
);
let connected = request.next().await?;
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
unreachable!("query includes root");
Expand Down Expand Up @@ -172,7 +202,7 @@ pub async fn chunk_probe(
///
/// The random chunk is chosen uniformly from the chunks of the children, so
/// larger children are more likely to be selected.
pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> ChunkRangesSeq {
let total_chunks = sizes
.iter()
.map(|size| ChunkNum::full_chunks(*size).0)
Expand All @@ -193,5 +223,5 @@ pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq
ranges.push(ChunkRanges::empty());
}
}
RangeSpecSeq::from_ranges(ranges)
ChunkRangesSeq::from_ranges(ranges)
}
60 changes: 55 additions & 5 deletions content-discovery/iroh-content-tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ pub mod args;

use std::{
net::{SocketAddrV4, SocketAddrV6},
path::PathBuf,
sync::atomic::{AtomicBool, Ordering},
time::{Duration, Instant},
};

use clap::Parser;
use iroh::Endpoint;
use iroh_blobs::util::fs::load_secret_key;
use iroh::{endpoint::BindError, Endpoint, Watcher};
use iroh_content_discovery::protocol::ALPN;
use iroh_content_tracker::{
io::{
Expand Down Expand Up @@ -47,7 +47,7 @@ macro_rules! log {
async fn await_relay_region(endpoint: &Endpoint) -> anyhow::Result<()> {
let t0 = Instant::now();
loop {
let addr = endpoint.node_addr().await?;
let addr = endpoint.node_addr().initialized().await?;
if addr.relay_url().is_some() {
break;
}
Expand All @@ -63,7 +63,7 @@ async fn create_endpoint(
key: iroh::SecretKey,
ipv4_addr: Option<SocketAddrV4>,
ipv6_addr: Option<SocketAddrV6>,
) -> anyhow::Result<Endpoint> {
) -> Result<Endpoint, BindError> {
let mut builder = iroh::Endpoint::builder()
.secret_key(key)
.discovery_dht()
Expand Down Expand Up @@ -113,7 +113,7 @@ async fn server(args: Args) -> anyhow::Result<()> {
let db = Tracker::new(options, endpoint.clone())?;
db.dump().await?;
await_relay_region(&endpoint).await?;
let addr = endpoint.node_addr().await?;
let addr = endpoint.node_addr().initialized().await?;
println!("tracker addr: {}\n", addr.node_id);
info!("listening on {:?}", addr);
// let db2 = db.clone();
Expand Down Expand Up @@ -153,3 +153,53 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
server(args).await
}

pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result<iroh::SecretKey> {
use anyhow::Context;
use iroh::SecretKey;
use tokio::io::AsyncWriteExt;

if key_path.exists() {
let keystr = tokio::fs::read(key_path).await?;

let ser_key = ssh_key::private::PrivateKey::from_openssh(keystr)?;
let ssh_key::private::KeypairData::Ed25519(kp) = ser_key.key_data() else {
anyhow::bail!("invalid key format");
};
let secret_key = SecretKey::from_bytes(&kp.private.to_bytes());
Ok(secret_key)
} else {
let secret_key = SecretKey::generate(rand::rngs::OsRng);
let ckey = ssh_key::private::Ed25519Keypair {
public: secret_key.public().public().into(),
private: secret_key.secret().into(),
};
let ser_key =
ssh_key::private::PrivateKey::from(ckey).to_openssh(ssh_key::LineEnding::default())?;

// Try to canonicalize if possible
let key_path = key_path.canonicalize().unwrap_or(key_path);
let key_path_parent = key_path.parent().ok_or_else(|| {
anyhow::anyhow!("no parent directory found for '{}'", key_path.display())
})?;
tokio::fs::create_dir_all(&key_path_parent).await?;

// write to tempfile
let (file, temp_file_path) = tempfile::NamedTempFile::new_in(key_path_parent)
.context("unable to create tempfile")?
.into_parts();
let mut file = tokio::fs::File::from_std(file);
file.write_all(ser_key.as_bytes())
.await
.context("unable to write keyfile")?;
file.flush().await?;
drop(file);

// move file
tokio::fs::rename(temp_file_path, key_path)
.await
.context("failed to rename keyfile")?;

Ok(secret_key)
}
}
30 changes: 20 additions & 10 deletions content-discovery/iroh-content-tracker/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use anyhow::{bail, Context};
use bao_tree::ChunkNum;
use iroh::{endpoint::Connection, Endpoint, NodeId};
use iroh_blobs::{
get::{fsm::EndBlobNext, Stats},
get::{
fsm::{EndBlobNext, RequestCounters},
Stats,
},
hashseq::HashSeq,
protocol::GetRequest,
BlobFormat, Hash, HashAndFormat,
Expand Down Expand Up @@ -1022,7 +1025,7 @@ impl Tracker {
content: &HashAndFormat,
probe_kind: ProbeKind,
) -> anyhow::Result<Stats> {
let cap = format!("{} at {}", content, host);
let cap = format!("{content} at {host}");
let HashAndFormat { hash, format } = content;
let mut rng = rand::thread_rng();
let stats = if probe_kind == ProbeKind::Incomplete {
Expand Down Expand Up @@ -1054,23 +1057,30 @@ impl Tracker {
let (hs, sizes) = self.get_or_insert_sizes(connection, hash).await?;
let ranges = random_hash_seq_ranges(&sizes, rand::thread_rng());
let text = ranges
.iter_non_empty()
.map(|(index, ranges)| {
format!("child={}, ranges={:?}", index, ranges.to_chunk_ranges())
})
.iter_non_empty_infinite()
.map(|(index, ranges)| format!("child={index}, ranges={ranges:?}"))
.collect::<Vec<_>>()
.join(", ");
tracing::debug!("Seq probing {} using {}", cap, text);
let request = GetRequest::new(*hash, ranges);
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
let request = iroh_blobs::get::fsm::start(
connection.clone(),
request,
RequestCounters {
payload_bytes_written: 0,
payload_bytes_read: 0,
other_bytes_written: 0,
other_bytes_read: 0,
},
);

let connected = request.next().await?;
let iroh_blobs::get::fsm::ConnectedNext::StartChild(child) =
connected.next().await?
else {
unreachable!("request does not include root");
};
let index =
usize::try_from(child.child_offset()).expect("child offset too large");
let index = usize::try_from(child.offset()).expect("child offset too large");
let hash = hs.get(index).expect("request inconsistent with hash seq");
let at_blob_header = child.next(hash);
let at_end_blob = at_blob_header.drain().await?;
Expand Down Expand Up @@ -1166,7 +1176,7 @@ impl Tracker {
Ok(connection) => connection,
Err(cause) => {
debug!("error dialing host {}: {}", host, cause);
return Err(cause);
return Err(cause.into());
}
};
let mut results = Vec::new();
Expand Down
Loading
Loading