diff --git a/Cargo.lock b/Cargo.lock index f41ced4d60..167aef8810 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,7 +191,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -514,7 +514,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -801,9 +801,9 @@ dependencies = [ [[package]] name = "crypto-bigint" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4c2f4e1afd912bc40bfd6fed5d9dc1f288e0ba01bfcc835cc5bc3eb13efe15" +checksum = "740fe28e594155f10cfc383984cbefd529d7396050557148f79cb0f621204124" dependencies = [ "generic-array", "rand_core", @@ -879,7 +879,7 @@ checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -903,7 +903,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -914,7 +914,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1006,7 +1006,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1035,7 +1035,7 @@ checksum = "df541e0e2a8069352be228ce4b85a1da6f59bfd325e56f57e4b241babbc3f832" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "unicode-xid", ] @@ -1107,7 +1107,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1258,7 +1258,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1436,7 +1436,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2360,9 +2360,9 @@ checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1" [[package]] name = "memchr" -version = "2.6.2" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" @@ -2689,7 +2689,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2700,9 +2700,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "memchr", ] @@ -2788,7 +2788,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2912,7 +2912,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2943,7 +2943,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -3259,7 +3259,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -3560,18 +3560,18 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] name = "regex" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", ] @@ -3586,9 +3586,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -3988,7 +3988,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4059,7 +4059,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4311,7 +4311,7 @@ dependencies = [ "proc-macro2", "quote", "struct_iterable_internal", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4329,7 +4329,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4340,7 +4340,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4419,9 +4419,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" dependencies = [ "proc-macro2", "quote", @@ -4514,7 +4514,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4533,22 +4533,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4641,7 +4641,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4719,6 +4719,7 @@ dependencies = [ "futures-util", "hashbrown 0.12.3", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -4794,7 +4795,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -5075,7 +5076,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-shared", ] @@ -5109,7 +5110,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5417,9 +5418,9 @@ dependencies = [ [[package]] name = "xml-rs" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47430998a7b5d499ccee752b41567bc3afc57e1327dc855b1a2aa44ce29b5fa1" +checksum = "1eee6bf5926be7cf998d7381a9a23d833fd493f6a8034658a9505a4dc4b20444" [[package]] name = "xmltree" diff --git a/iroh-bytes/src/protocol/range_spec.rs b/iroh-bytes/src/protocol/range_spec.rs index 661858425a..2409f23f70 100644 --- a/iroh-bytes/src/protocol/range_spec.rs +++ b/iroh-bytes/src/protocol/range_spec.rs @@ -39,7 +39,7 @@ use smallvec::{smallvec, SmallVec}; /// /// This is a SmallVec so we can avoid allocations for the very common case of a single /// chunk range. -#[derive(Deserialize, Serialize, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Hash)] #[repr(transparent)] pub struct RangeSpec(SmallVec<[u64; 2]>); @@ -152,7 +152,7 @@ impl fmt::Debug for RangeSpec { /// /// This is a smallvec so that we can avoid allocations in the common case of a single child /// range. -#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, Hash)] #[repr(transparent)] pub struct RangeSpecSeq(SmallVec<[(u64, RangeSpec); 2]>); diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index a8035ed120..f4e91f41c6 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -374,7 +374,7 @@ impl Actor { let peer_data = postcard::to_stdvec(&info)?; self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?; } - (peer_id, res) = self.dialer.next() => { + (peer_id, res) = self.dialer.next_conn() => { match res { Ok(conn) => { debug!(?me, peer = ?peer_id, "dial successfull"); diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index ff9d74bcfd..2f4dca4d9f 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -81,16 +81,17 @@ pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result, pending_peers: HashMap, } + impl Dialer { /// Create a new dialer for a [`MagicEndpoint`] pub fn new(endpoint: MagicEndpoint) -> Self { @@ -137,7 +138,7 @@ impl Dialer { } /// Wait for the next dial operation to complete - pub async fn next(&mut self) -> (PublicKey, anyhow::Result) { + pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result) { match self.pending_peers.is_empty() { false => { let (peer_id, res) = self.pending.next().await.unwrap(); @@ -147,6 +148,25 @@ impl Dialer { true => futures::future::pending().await, } } + + /// Number of pending connections to be opened. + pub fn pending_count(&self) -> usize { + self.pending_peers.len() + } +} + +impl futures::Stream for Dialer { + type Item = (PublicKey, anyhow::Result); + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.pending.poll_next_unpin(cx) { + std::task::Poll::Ready(res) if res.is_some() => std::task::Poll::Ready(res), + _ => std::task::Poll::Pending, + } + } } /// A [`TimerMap`] with an async method to wait for the next timer expiration. diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index 433de6a755..8a5ea22507 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -283,7 +283,7 @@ fn validate_entry + PublicKeyStore>( // If an existing entry exists, make sure it's older than the new entry. let existing = store.get(entry.id()); if let Ok(Some(existing)) = existing { - if existing.timestamp() > entry.timestamp() { + if existing.timestamp() >= entry.timestamp() { return Err(ValidationFailure::OlderThanExisting); } } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index bff8805f06..49248a4130 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -42,7 +42,7 @@ serde = { version = "1", features = ["derive"] } thiserror = "1" tokio = { version = "1", features = ["io-util", "rt"] } tokio-stream = "0.1" -tokio-util = { version = "0.7", features = ["codec", "io-util", "io"] } +tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } tracing = "0.1" walkdir = "2" diff --git a/iroh/examples/sync.rs b/iroh/examples/sync.rs index 13196330a7..9a86995b72 100644 --- a/iroh/examples/sync.rs +++ b/iroh/examples/sync.rs @@ -18,7 +18,7 @@ use clap::{CommandFactory, FromArgMatches, Parser}; use futures::StreamExt; use indicatif::HumanBytes; use iroh::{ - download::Downloader, + downloader::Downloader, sync_engine::{LiveEvent, PeerSource, SyncEngine, SYNC_ALPN}, }; use iroh_bytes::util::runtime; @@ -230,8 +230,11 @@ async fn run(args: Args) -> anyhow::Result<()> { std::fs::create_dir_all(&blob_path)?; let db = iroh::baomap::flat::Store::load(&blob_path, &blob_path, &rt).await?; + let collection_parser = iroh::collection::IrohCollectionParser; + // create the live syncer - let downloader = Downloader::new(rt.clone(), endpoint.clone(), db.clone()); + let downloader = + Downloader::new(db.clone(), collection_parser, endpoint.clone(), rt.clone()).await; let live_sync = SyncEngine::spawn( rt.clone(), endpoint.clone(), diff --git a/iroh/src/download.rs b/iroh/src/download.rs deleted file mode 100644 index d7acb134b1..0000000000 --- a/iroh/src/download.rs +++ /dev/null @@ -1,386 +0,0 @@ -//! Download queue - -#[cfg(feature = "metrics")] -use std::time::Instant; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; - -use anyhow::anyhow; -use futures::{ - future::{BoxFuture, LocalBoxFuture, Shared}, - stream::FuturesUnordered, - FutureExt, -}; -use iroh_bytes::{ - baomap::{MapEntry, Store as BaoStore}, - util::{progress::IgnoreProgressSender, Hash}, -}; -use iroh_gossip::net::util::Dialer; -#[cfg(feature = "metrics")] -use iroh_metrics::{inc, inc_by}; -use iroh_net::{key::PublicKey, MagicEndpoint}; -use tokio::sync::{oneshot, Mutex}; -use tokio_stream::StreamExt; -use tracing::{debug, error, warn}; - -#[cfg(feature = "metrics")] -use crate::metrics::Metrics; - -/// Future for the completion of a download request -pub type DownloadFuture = Shared>>; - -/// A download queue for iroh-bytes -/// -/// Spawns a background task that handles connecting to peers and performing get requests. -/// -/// TODO: Support retries and backoff - become a proper queue... -/// TODO: Support collections, likely become generic over C: CollectionParser -#[derive(Debug, Clone)] -pub struct Downloader { - pending_downloads: Arc>>, - to_actor_tx: flume::Sender, -} - -impl Downloader { - /// Create a new downloader - pub fn new( - rt: iroh_bytes::util::runtime::Handle, - endpoint: MagicEndpoint, - db: B, - ) -> Self { - let (tx, rx) = flume::bounded(64); - // spawn the actor on a local pool - // the local pool is required because WritableFileDatabase::download_single - // returns a future that is !Send - rt.local_pool().spawn_pinned(move || async move { - let mut actor = DownloadActor::new(endpoint, db, rx); - if let Err(err) = actor.run().await { - error!("download actor failed with error {err:?}"); - } - }); - Self { - pending_downloads: Arc::new(Mutex::new(HashMap::new())), - to_actor_tx: tx, - } - } - - /// Add a new download request to the download queue. - /// - /// Note: This method takes only [`PublicKey`]s and will attempt to connect to those peers. For - /// this to succeed, you need to add addresses for these peers to the magic endpoint's - /// addressbook yourself. See [`MagicEndpoint::add_known_addrs`]. - pub async fn push(&self, hash: Hash, peers: Vec) { - let (reply, reply_rx) = oneshot::channel(); - let req = DownloadRequest { hash, peers, reply }; - - if let Err(err) = self.to_actor_tx.send_async(req).await { - warn!("download actor dropped: {err}"); - } - - if self.pending_downloads.lock().await.get(&hash).is_none() { - let pending_downloads = self.pending_downloads.clone(); - let fut = async move { - let res = reply_rx.await; - pending_downloads.lock().await.remove(&hash); - res.ok().flatten() - }; - self.pending_downloads - .lock() - .await - .insert(hash, fut.boxed().shared()); - } - } - - /// Returns a future that completes once the blob for `hash` has been downloaded, or all queued - /// requests for that blob have failed. - /// - /// NOTE: This does not start the download itself. Use [`Self::push`] for that. - pub async fn finished(&self, hash: &Hash) -> DownloadFuture { - match self.pending_downloads.lock().await.get(hash) { - Some(fut) => fut.clone(), - None => futures::future::ready(None).boxed().shared(), - } - } -} - -type DownloadReply = oneshot::Sender>; -type PendingDownloadsFutures = - FuturesUnordered>)>>; - -#[derive(Debug)] -struct DownloadRequest { - hash: Hash, - peers: Vec, - reply: DownloadReply, -} - -#[derive(Debug)] -struct DownloadActor { - dialer: Dialer, - db: B, - conns: HashMap, - replies: HashMap>, - pending_download_futs: PendingDownloadsFutures, - queue: DownloadQueue, - rx: flume::Receiver, -} -impl DownloadActor { - fn new(endpoint: MagicEndpoint, db: B, rx: flume::Receiver) -> Self { - Self { - rx, - db, - dialer: Dialer::new(endpoint), - replies: Default::default(), - conns: Default::default(), - pending_download_futs: Default::default(), - queue: Default::default(), - } - } - pub async fn run(&mut self) -> anyhow::Result<()> { - loop { - tokio::select! { - req = self.rx.recv_async() => match req { - Err(_) => return Ok(()), - Ok(req) => self.on_download_request(req).await - }, - (peer, conn) = self.dialer.next() => match conn { - Ok(conn) => { - debug!(peer = ?peer, "connection established"); - self.conns.insert(peer, conn); - self.on_peer_ready(peer); - }, - Err(err) => self.on_peer_fail(&peer, err), - }, - Some((peer, hash, res)) = self.pending_download_futs.next() => match res { - Ok(Some(size)) => { - self.queue.on_success(hash, peer); - self.reply(hash, Some((hash, size))); - self.on_peer_ready(peer); - } - Ok(None) => { - // TODO: This case is currently never reached, because iroh::get::get_blob - // doesn't return an option but only a result, with no way (AFAICS) to discern - // between connection error and not found. - // self.on_not_found(&peer, hash); - // self.on_peer_ready(peer); - unreachable!() - } - Err(_err) => { - self.on_not_found(&peer, hash); - self.on_peer_ready(peer); - // TODO: In case of connection errors or similar we want to call - // on_peer_fail to not continue downloading from this peer. - // Currently however a "not found" is also an error, thus calling - // on_peer_fail would stop trying to get other hashes from this peer. - // This likely needs fixing in iroh::get::get to have a meaningful error to - // see if the connection failed or if it's just a "not found". - // self.on_peer_fail(&peer, err), - } - } - } - } - } - - fn reply(&mut self, hash: Hash, res: Option<(Hash, u64)>) { - for reply in self.replies.remove(&hash).into_iter().flatten() { - reply.send(res).ok(); - } - } - - fn on_peer_fail(&mut self, peer: &PublicKey, err: anyhow::Error) { - warn!("download from {peer} failed: {err:?}"); - for hash in self.queue.on_peer_fail(peer) { - self.reply(hash, None); - } - self.conns.remove(peer); - } - - fn on_not_found(&mut self, peer: &PublicKey, hash: Hash) { - self.queue.on_not_found(hash, *peer); - if self.queue.has_no_candidates(&hash) { - self.reply(hash, None); - } - } - - fn on_peer_ready(&mut self, peer: PublicKey) { - if let Some(hash) = self.queue.try_next_for_peer(peer) { - debug!(peer = ?peer, hash = ?hash, "on_peer_ready: get next"); - self.start_download_unchecked(peer, hash); - } else { - debug!(peer = ?peer, "on_peer_ready: nothing left, disconnect"); - self.conns.remove(&peer); - } - } - - fn start_download_unchecked(&mut self, peer: PublicKey, hash: Hash) { - let conn = self.conns.get(&peer).unwrap().clone(); - let db = self.db.clone(); - let progress_sender = IgnoreProgressSender::default(); - - let fut = async move { - debug!(peer = ?peer, hash = ?hash, "start download"); - - #[cfg(feature = "metrics")] - let start = Instant::now(); - - // TODO: None for not found instead of error - let res = crate::get::get_blob(&db, conn, &hash, progress_sender).await; - let res = res.and_then(|_stats| { - db.get(&hash) - .ok_or_else(|| anyhow!("downloaded blob not found in store")) - .map(|entry| Some(entry.size())) - }); - debug!(peer = ?peer, hash = ?hash, "finish download: {res:?}"); - - // record metrics - #[cfg(feature = "metrics")] - { - let elapsed = start.elapsed().as_millis(); - match &res { - Ok(Some(len)) => { - inc!(Metrics, downloads_success); - inc_by!(Metrics, download_bytes_total, *len); - inc_by!(Metrics, download_time_total, elapsed as u64); - } - Ok(None) => inc!(Metrics, downloads_notfound), - Err(_) => inc!(Metrics, downloads_error), - } - } - - (peer, hash, res) - }; - self.pending_download_futs.push(fut.boxed_local()); - } - - async fn on_download_request(&mut self, req: DownloadRequest) { - let DownloadRequest { peers, hash, reply } = req; - if let Some(entry) = self.db.get(&hash) { - let size = entry.size(); - reply.send(Some((hash, size))).ok(); - return; - } - self.replies.entry(hash).or_default().push_back(reply); - for peer in peers { - debug!(peer = ?peer, hash = ?hash, "queue download"); - self.queue.push_candidate(hash, peer); - // TODO: Don't dial all peers instantly. - if self.conns.get(&peer).is_none() && !self.dialer.is_pending(&peer) { - self.dialer.queue_dial(peer, &iroh_bytes::protocol::ALPN); - } - } - } -} - -#[derive(Debug, Default)] -struct DownloadQueue { - candidates_by_hash: HashMap>, - candidates_by_peer: HashMap>, - running_by_hash: HashMap, - running_by_peer: HashMap, -} - -impl DownloadQueue { - pub fn push_candidate(&mut self, hash: Hash, peer: PublicKey) { - self.candidates_by_hash - .entry(hash) - .or_default() - .push_back(peer); - self.candidates_by_peer - .entry(peer) - .or_default() - .push_back(hash); - } - - pub fn try_next_for_peer(&mut self, peer: PublicKey) -> Option { - let mut next = None; - for (idx, hash) in self.candidates_by_peer.get(&peer)?.iter().enumerate() { - if !self.running_by_hash.contains_key(hash) { - next = Some((idx, *hash)); - break; - } - } - if let Some((idx, hash)) = next { - self.running_by_hash.insert(hash, peer); - self.running_by_peer.insert(peer, hash); - self.candidates_by_peer.get_mut(&peer).unwrap().remove(idx); - if let Some(peers) = self.candidates_by_hash.get_mut(&hash) { - peers.retain(|p| p != &peer); - } - self.ensure_no_empty(hash, peer); - Some(hash) - } else { - None - } - } - - pub fn has_no_candidates(&self, hash: &Hash) -> bool { - self.candidates_by_hash.get(hash).is_none() && self.running_by_hash.get(hash).is_none() - } - - /// Mark a download as successfull. - pub fn on_success(&mut self, hash: Hash, peer: PublicKey) { - let peer2 = self.running_by_hash.remove(&hash); - debug_assert_eq!(peer2, Some(peer)); - self.running_by_peer.remove(&peer); - self.candidates_by_hash.remove(&hash); - for hashes in self.candidates_by_peer.values_mut() { - hashes.retain(|h| h != &hash); - } - self.ensure_no_empty(hash, peer); - } - - /// To be called when a peer failed (i.e. disconnected). - /// - /// Returns a list of hashes that have no other peers queue. Those hashes should thus be - /// considered failed. - pub fn on_peer_fail(&mut self, peer: &PublicKey) -> Vec { - let mut failed = vec![]; - for hash in self - .candidates_by_peer - .remove(peer) - .map(|hashes| hashes.into_iter()) - .into_iter() - .flatten() - { - if let Some(peers) = self.candidates_by_hash.get_mut(&hash) { - peers.retain(|p| p != peer); - if peers.is_empty() && self.running_by_hash.get(&hash).is_none() { - failed.push(hash); - } - } - } - if let Some(hash) = self.running_by_peer.remove(peer) { - self.running_by_hash.remove(&hash); - if self.candidates_by_hash.get(&hash).is_none() { - failed.push(hash); - } - } - failed - } - - pub fn on_not_found(&mut self, hash: Hash, peer: PublicKey) { - let peer2 = self.running_by_hash.remove(&hash); - debug_assert_eq!(peer2, Some(peer)); - self.running_by_peer.remove(&peer); - self.ensure_no_empty(hash, peer); - } - - fn ensure_no_empty(&mut self, hash: Hash, peer: PublicKey) { - if self - .candidates_by_peer - .get(&peer) - .map_or(false, |hashes| hashes.is_empty()) - { - self.candidates_by_peer.remove(&peer); - } - if self - .candidates_by_hash - .get(&hash) - .map_or(false, |peers| peers.is_empty()) - { - self.candidates_by_hash.remove(&hash); - } - } -} diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs new file mode 100644 index 0000000000..df139a2266 --- /dev/null +++ b/iroh/src/downloader.rs @@ -0,0 +1,989 @@ +//! Handle downloading blobs and collections concurrently and from peers. +//! +//! The [`Downloader`] interacts with four main components to this end. +//! - [`Dialer`]: Used to queue opening connections to peers we need to perform downloads. +//! - [`ProviderMap`]: Where the downloader obtains information about peers that could be +//! used to perform a download. +//! - [`Store`]: Where data is stored. +//! - [`CollectionParser`]: Used by the Get state machine logic to identify blobs encoding +//! collections. +//! +//! Once a download request is received, the logic is as follows: +//! 1. The [`ProviderMap`] is queried for peers. From these peers some are selected +//! prioritizing connected peers with lower number of active requests. If no useful peer is +//! connected, or useful connected peers have no capacity to perform the request, a connection +//! attempt is started using the [`Dialer`]. +//! 2. The download is queued for processing at a later time. Downloads are not performed right +//! away. Instead, they are initially delayed to allow the peer to obtain the data itself, and +//! to wait for the new connection to be established if necessary. +//! 3. Once a request is ready to be sent after a delay (initial or for a retry), the preferred +//! peer is used if available. The request is now considered active. +//! +//! Concurrency is limited in different ways: +//! - *Total number of active request:* This is a way to prevent a self DoS by overwhelming our own +//! bandwidth capacity. This is a best effort heuristic since it doesn't take into account how +//! much data we are actually requesting or receiving. +//! - *Total number of connected peers:* Peer connections are kept for a longer time than they are +//! strictly needed since it's likely they will be useful soon again. +//! - *Requests per peer*: to avoid overwhelming peers with requests, the number of concurrent +//! requests to a single peer is also limited. + +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + num::NonZeroUsize, +}; + +use futures::{future::LocalBoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use iroh_bytes::{ + baomap::{range_collections::RangeSet2, Store}, + collection::CollectionParser, + protocol::RangeSpecSeq, + Hash, +}; +use iroh_net::{key::PublicKey, MagicEndpoint}; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::{sync::CancellationToken, time::delay_queue}; +use tracing::{debug, trace}; + +mod get; +mod invariants; +mod test; + +/// Delay added to a request when it's first received. +const INITIAL_REQUEST_DELAY: std::time::Duration = std::time::Duration::from_millis(500); +/// Number of retries initially assigned to a request. +const INITIAL_RETRY_COUNT: u8 = 4; +/// Duration for which we keep peers connected after they were last useful to us. +const IDLE_PEER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +/// Capacity of the channel used to comunicate between the [`Downloader`] and the [`Service`]. +const SERVICE_CHANNEL_CAPACITY: usize = 128; + +/// Download identifier. +// Mainly for readability. +pub type Id = u64; + +/// Trait modeling a dialer. This allows for IO-less testing. +pub trait Dialer: + futures::Stream)> + Unpin +{ + /// Type of connections returned by the Dialer. + type Connection: Clone; + /// Dial a peer. + fn queue_dial(&mut self, peer_id: PublicKey); + /// Get the number of dialing peers. + fn pending_count(&self) -> usize; + /// Check if a peer is being dialed. + fn is_pending(&self, peer: &PublicKey) -> bool; +} + +/// Signals what should be done with the request when it fails. +#[derive(Debug)] +pub enum FailureAction { + /// An error ocurred that prevents the request from being retried at all. + AbortRequest(anyhow::Error), + /// An error occurred that suggests the peer should not be used in general. + DropPeer(anyhow::Error), + /// An error occurred in which neither the peer nor the request are at fault. + RetryLater(anyhow::Error), +} + +/// Future of a get request. +type GetFut = LocalBoxFuture<'static, Result<(), FailureAction>>; + +/// Trait modelling performing a single request over a connection. This allows for IO-less testing. +pub trait Getter { + /// Type of connections the Getter requires to perform a download. + type Connection; + /// Return a future that performs the download using the given connection. + fn get(&mut self, kind: DownloadKind, conn: Self::Connection) -> GetFut; +} + +/// Concurrency limits for the [`Downloader`]. +#[derive(Debug)] +pub struct ConcurrencyLimits { + /// Maximum number of requests the service performs concurrently. + pub max_concurrent_requests: usize, + /// Maximum number of requests performed by a single peer concurrently. + pub max_concurrent_requests_per_peer: usize, + /// Maximum number of open connections the service maintains. + pub max_open_connections: usize, +} + +impl Default for ConcurrencyLimits { + fn default() -> Self { + // these numbers should be checked against a running node and might depend on platform + ConcurrencyLimits { + max_concurrent_requests: 50, + max_concurrent_requests_per_peer: 4, + max_open_connections: 25, + } + } +} + +impl ConcurrencyLimits { + /// Checks if the maximum number of concurrent requests has been reached. + fn at_requests_capacity(&self, active_requests: usize) -> bool { + active_requests >= self.max_concurrent_requests + } + + /// Checks if the maximum number of concurrent requests per peer has been reached. + fn peer_at_request_capacity(&self, active_peer_requests: usize) -> bool { + active_peer_requests >= self.max_concurrent_requests_per_peer + } + + /// Checks if the maximum number of connections has been reached. + fn at_connections_capacity(&self, active_connections: usize) -> bool { + active_connections >= self.max_open_connections + } +} + +/// Download requests the [`Downloader`] handles. +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub enum DownloadKind { + /// Download a single blob entirely. + Blob { + /// Blob to be downloaded. + hash: Hash, + }, + /// Download a collection entirely. + Collection { + /// Blob to be downloaded. + hash: Hash, + }, +} + +impl DownloadKind { + /// Get the requested hash. + const fn hash(&self) -> &Hash { + match self { + DownloadKind::Blob { hash } | DownloadKind::Collection { hash } => hash, + } + } + + /// Get the ranges this download is requesting. + // NOTE: necessary to extend downloads to support ranges of blobs ranges of collections. + #[allow(dead_code)] + fn ranges(&self) -> RangeSpecSeq { + match self { + DownloadKind::Blob { .. } => RangeSpecSeq::from_ranges([RangeSet2::all()]), + DownloadKind::Collection { .. } => RangeSpecSeq::all(), + } + } +} + +// For readability. In the future we might care about some data reporting on a successful download +// or kind of failure in the error case. +type DownloadResult = anyhow::Result<()>; + +/// Handle to interact with a download request. +#[derive(Debug)] +pub struct DownloadHandle { + /// Id used to identify the request in the [`Downloader`]. + id: Id, + /// Kind of download. + kind: DownloadKind, + /// Receiver to retrieve the return value of this download. + receiver: oneshot::Receiver, +} + +impl std::future::Future for DownloadHandle { + type Output = DownloadResult; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + use std::task::Poll::*; + // make it easier on holders of the handle to poll the result, removing the receiver error + // from the middle + match self.receiver.poll_unpin(cx) { + Ready(Ok(result)) => Ready(result), + Ready(Err(recv_err)) => Ready(Err(anyhow::anyhow!("oneshot error: {recv_err}"))), + Pending => Pending, + } + } +} + +/// Handle for the download services. +#[derive(Debug)] +pub struct Downloader { + /// Next id to use for a download intent. + next_id: Id, + /// Channel to communicate with the service. + msg_tx: mpsc::Sender, +} + +impl Downloader { + /// Create a new Downloader. + pub async fn new( + store: S, + collection_parser: C, + endpoint: MagicEndpoint, + rt: iroh_bytes::util::runtime::Handle, + ) -> Self + where + S: Store, + C: CollectionParser, + { + let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); + let dialer = iroh_gossip::net::util::Dialer::new(endpoint); + + let create_future = move || { + let concurrency_limits = ConcurrencyLimits::default(); + let getter = get::IoGetter { + store, + collection_parser, + }; + + let service = Service::new(getter, dialer, concurrency_limits, msg_rx); + + service.run() + }; + rt.local_pool().spawn_pinned(create_future); + Self { next_id: 0, msg_tx } + } + + /// Queue a download. + pub async fn queue(&mut self, kind: DownloadKind, peers: Vec) -> DownloadHandle { + let id = self.next_id; + self.next_id = self.next_id.wrapping_add(1); + + let (sender, receiver) = oneshot::channel(); + let handle = DownloadHandle { + id, + kind: kind.clone(), + receiver, + }; + let msg = Message::Queue { + kind, + id, + sender, + peers, + }; + // if this fails polling the handle will fail as well since the sender side of the oneshot + // will be dropped + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "download not sent"); + } + handle + } + + /// Cancel a download. + // NOTE: receiving the handle ensures an intent can't be cancelled twice + pub async fn cancel(&mut self, handle: DownloadHandle) { + let DownloadHandle { + id, + kind, + receiver: _, + } = handle; + let msg = Message::Cancel { id, kind }; + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "cancel not sent"); + } + } + + /// Declare that certains peers can be used to download a hash. + pub async fn peers_have(&mut self, hash: Hash, peers: Vec) { + let msg = Message::PeersHave { hash, peers }; + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "peers have not sent") + } + } +} + +/// Messages the service can receive. +#[derive(derive_more::Debug)] +enum Message { + /// Queue a download intent. + Queue { + kind: DownloadKind, + id: Id, + #[debug(skip)] + sender: oneshot::Sender, + peers: Vec, + }, + /// Cancel an intent. The associated request will be cancelled when the last intent is + /// cancelled. + Cancel { id: Id, kind: DownloadKind }, + /// Declare that peers have certains hash and can be used for downloading. This feeds the [`ProviderMap`]. + PeersHave { hash: Hash, peers: Vec }, +} + +/// Information about a request being processed. +#[derive(derive_more::Debug)] +struct ActiveRequestInfo { + /// Ids of intents associated with this request. + #[debug("{:?}", intents.keys().collect::>())] + intents: HashMap>, + /// How many times can this request be retried. + remaining_retries: u8, + /// Token used to cancel the future doing the request. + #[debug(skip)] + cancellation: CancellationToken, + /// Peer doing this request attempt. + peer: PublicKey, +} + +/// Information about a request that has not started. +#[derive(derive_more::Debug)] +struct PendingRequestInfo { + /// Ids of intents associated with this request. + #[debug("{:?}", intents.keys().collect::>())] + intents: HashMap>, + /// How many times can this request be retried. + remaining_retries: u8, + /// Key to manage the delay associated with this scheduled request. + #[debug(skip)] + delay_key: delay_queue::Key, + /// If this attempt was scheduled with a known potential peer, this is stored here to + /// prevent another query to the [`ProviderMap`]. + next_peer: Option, +} + +/// State of the connection to this peer. +#[derive(derive_more::Debug)] +struct ConnectionInfo { + /// Connection to this peer. + /// + /// If this peer was deemed unusable by a request, this will be set to `None`. As a + /// consequence, when evaluating peers for a download, this peer will not be considered. + /// Since peers are kept for a longer time that they are strictly necessary, this acts as a + /// temporary ban. + #[debug(skip)] + conn: Option, + /// State of this peer. + state: PeerState, +} + +impl ConnectionInfo { + /// Create a new idle peer. + fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self { + ConnectionInfo { + conn: Some(connection), + state: PeerState::Idle { drop_key }, + } + } + + /// Count of active requests for the peer. + fn active_requests(&self) -> usize { + match self.state { + PeerState::Busy { active_requests } => active_requests.get(), + PeerState::Idle { .. } => 0, + } + } +} + +/// State of a connected peer. +#[derive(derive_more::Debug)] +enum PeerState { + /// Peer is handling at least one request. + Busy { + #[debug("{}", active_requests.get())] + active_requests: NonZeroUsize, + }, + /// Peer is idle. + Idle { + #[debug(skip)] + drop_key: delay_queue::Key, + }, +} + +/// Type of future that performs a download request. +type DownloadFut = LocalBoxFuture<'static, (DownloadKind, Result<(), FailureAction>)>; + +#[derive(Debug)] +struct Service { + /// The getter performs individual requests. + getter: G, + /// Map to query for peers that we believe have the data we are looking for. + providers: ProviderMap, + /// Dialer to get connections for required peers. + dialer: D, + /// Limits to concurrent tasks handled by the service. + concurrency_limits: ConcurrencyLimits, + /// Channel to receive messages from the service's handle. + msg_rx: mpsc::Receiver, + /// Peers available to use and their relevant information. + peers: HashMap>, + /// Queue to manage dropping peers. + goodbye_peer_queue: delay_queue::DelayQueue, + /// Requests performed for download intents. Two download requests can produce the same + /// request. This map allows deduplication of efforts. + current_requests: HashMap, + /// Downloads underway. + in_progress_downloads: FuturesUnordered, + /// Requests scheduled to be downloaded at a later time. + scheduled_requests: HashMap, + /// Queue of scheduled requests. + scheduled_request_queue: delay_queue::DelayQueue, +} + +impl, D: Dialer> Service { + fn new( + getter: G, + dialer: D, + concurrency_limits: ConcurrencyLimits, + msg_rx: mpsc::Receiver, + ) -> Self { + Service { + getter, + providers: ProviderMap::default(), + dialer, + concurrency_limits, + msg_rx, + peers: HashMap::default(), + goodbye_peer_queue: delay_queue::DelayQueue::default(), + current_requests: HashMap::default(), + in_progress_downloads: FuturesUnordered::default(), + scheduled_requests: HashMap::default(), + scheduled_request_queue: delay_queue::DelayQueue::default(), + } + } + + /// Main loop for the service. + async fn run(mut self) { + loop { + // check if we have capacity to dequeue another scheduled request + let at_capacity = self + .concurrency_limits + .at_requests_capacity(self.in_progress_downloads.len()); + + tokio::select! { + Some((peer, conn_result)) = self.dialer.next() => { + trace!("tick: connection ready"); + self.on_connection_ready(peer, conn_result); + } + maybe_msg = self.msg_rx.recv() => { + trace!(msg=?maybe_msg, "tick: message received"); + match maybe_msg { + Some(msg) => self.handle_message(msg), + None => return self.shutdown().await, + } + } + Some((kind, result)) = self.in_progress_downloads.next() => { + trace!("tick: download completed"); + self.on_download_completed(kind, result); + } + Some(expired) = self.scheduled_request_queue.next(), if !at_capacity => { + trace!("tick: scheduled request ready"); + let kind = expired.into_inner(); + let request_info = self.scheduled_requests.remove(&kind).expect("is registered"); + self.on_scheduled_request_ready(kind, request_info); + } + Some(expired) = self.goodbye_peer_queue.next() => { + let peer = expired.into_inner(); + self.peers.remove(&peer); + trace!(%peer, "tick: goodbye peer"); + } + } + #[cfg(any(test, debug_assertions))] + self.check_invariants(); + } + } + + /// Handle receiving a [`Message`]. + fn handle_message(&mut self, msg: Message) { + match msg { + Message::Queue { + kind, + id, + sender, + peers, + } => self.handle_queue_new_download(kind, id, sender, peers), + Message::Cancel { id, kind } => self.handle_cancel_download(id, kind), + Message::PeersHave { hash, peers } => self.handle_peers_have(hash, peers), + } + } + + /// Handle a [`Message::Queue`]. + /// + /// If this intent maps to a request that already exists, it will be registered with it. If the + /// request is new it will be scheduled. + fn handle_queue_new_download( + &mut self, + kind: DownloadKind, + id: Id, + sender: oneshot::Sender, + peers: Vec, + ) { + self.providers.add_peers(*kind.hash(), &peers); + if let Some(info) = self.current_requests.get_mut(&kind) { + // this intent maps to a download that already exists, simply register it + info.intents.insert(id, sender); + // increasing the retries by one accounts for multiple intents for the same request in + // a conservative way + info.remaining_retries += 1; + return trace!(?kind, ?info, "intent registered with active request"); + } + + let needs_peer = self + .scheduled_requests + .get(&kind) + .map(|info| info.next_peer.is_none()) + .unwrap_or(true); + + let next_peer = needs_peer + .then(|| self.get_best_candidate(kind.hash())) + .flatten(); + + // if we are here this request is not active, check if it needs to be scheduled + match self.scheduled_requests.get_mut(&kind) { + Some(info) => { + info.intents.insert(id, sender); + // pre-emptively get a peer if we don't already have one + if info.next_peer.is_none() { + info.next_peer = next_peer + } + // increasing the retries by one accounts for multiple intents for the same request in + // a conservative way + info.remaining_retries += 1; + trace!(?kind, ?info, "intent registered with scheduled request"); + } + None => { + let intents = HashMap::from([(id, sender)]); + self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents) + } + } + } + + /// Gets the best candidate for a download. + /// + /// Peers are selected prioritizing those with an open connection and with capacity for another + /// request, followed by peers we are currently dialing with capacity for another request. + /// Lastly, peers not connected and not dialing are considered. + /// + /// If the selected candidate is not connected and we have capacity for another connection, a + /// dial is queued. + fn get_best_candidate(&mut self, hash: &Hash) -> Option { + /// Model the state of peers found in the candidates + #[derive(PartialEq, Eq, Clone, Copy)] + enum ConnState { + Dialing, + Connected(usize), + NotConnected, + } + + impl Ord for ConnState { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // define the order of preference between candidates as follows: + // - prefer connected peers to dialing ones + // - prefer dialing peers to not connected ones + // - prefer peers with less active requests when connected + use std::cmp::Ordering::*; + match (self, other) { + (ConnState::Dialing, ConnState::Dialing) => Equal, + (ConnState::Dialing, ConnState::Connected(_)) => Less, + (ConnState::Dialing, ConnState::NotConnected) => Greater, + (ConnState::NotConnected, ConnState::Dialing) => Less, + (ConnState::NotConnected, ConnState::Connected(_)) => Less, + (ConnState::NotConnected, ConnState::NotConnected) => Equal, + (ConnState::Connected(_), ConnState::Dialing) => Greater, + (ConnState::Connected(_), ConnState::NotConnected) => Greater, + (ConnState::Connected(a), ConnState::Connected(b)) => match a.cmp(b) { + Less => Greater, // less preferable if greater number of requests + Equal => Equal, // no preference + Greater => Less, // more preferable if less number of requests + }, + } + } + } + + impl PartialOrd for ConnState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + // first collect suitable candidates + let mut candidates = self + .providers + .get_candidates(hash) + .filter_map(|peer| { + if let Some(info) = self.peers.get(peer) { + info.conn.as_ref()?; + let req_count = info.active_requests(); + // filter out peers at capacity + let has_capacity = !self.concurrency_limits.peer_at_request_capacity(req_count); + has_capacity.then_some((peer, ConnState::Connected(req_count))) + } else if self.dialer.is_pending(peer) { + Some((peer, ConnState::Dialing)) + } else { + Some((peer, ConnState::NotConnected)) + } + }) + .collect::>(); + + candidates.sort_unstable_by_key(|peer_and_state| peer_and_state.1 /* state */); + + // this is our best peer, check if we need to dial it + let (peer, state) = candidates.pop()?; + + if let ConnState::NotConnected = state { + if !self.at_connections_capacity() { + // peer is not connected, not dialing and concurrency limits allow another connection + debug!(%peer, "dialing peer"); + self.dialer.queue_dial(*peer); + Some(*peer) + } else { + trace!(%peer, "required peer not dialed to maintain concurrency limits"); + None + } + } else { + Some(*peer) + } + } + + /// Cancels the download request. + /// + /// This removes the registered download intent and, depending on its state, it will either + /// remove it from the scheduled requests, or cancel the future. + fn handle_cancel_download(&mut self, id: Id, kind: DownloadKind) { + if let Entry::Occupied(mut occupied_entry) = self.current_requests.entry(kind.clone()) { + // remove the intent from the associated request + let intents = &mut occupied_entry.get_mut().intents; + intents.remove(&id); + // if this was the last intent associated with the request cancel it + if intents.is_empty() { + occupied_entry.remove().cancellation.cancel(); + } + } else if let Entry::Occupied(mut occupied_entry) = self.scheduled_requests.entry(kind) { + // remove the intent from the associated request + let intents = &mut occupied_entry.get_mut().intents; + intents.remove(&id); + // if this was the last intent associated with the request remove it from the schedule + // queue + if intents.is_empty() { + let delay_key = occupied_entry.remove().delay_key; + self.scheduled_request_queue.remove(&delay_key); + } + } + } + + /// Handle a [`Message::PeersHave`]. + fn handle_peers_have(&mut self, hash: Hash, peers: Vec) { + // check if this still needed + if self.is_needed(hash) { + self.providers.add_peers(hash, &peers); + } + } + + /// Checks if this hash is needed. + fn is_needed(&self, hash: Hash) -> bool { + let as_blob = DownloadKind::Blob { hash }; + let as_collection = DownloadKind::Collection { hash }; + self.current_requests.contains_key(&as_blob) + || self.scheduled_requests.contains_key(&as_blob) + || self.current_requests.contains_key(&as_collection) + || self.scheduled_requests.contains_key(&as_collection) + } + + /// Handle receiving a new connection. + fn on_connection_ready(&mut self, peer: PublicKey, result: anyhow::Result) { + match result { + Ok(connection) => { + trace!(%peer, "connected to peer"); + let drop_key = self.goodbye_peer_queue.insert(peer, IDLE_PEER_TIMEOUT); + self.peers + .insert(peer, ConnectionInfo::new_idle(connection, drop_key)); + } + Err(err) => { + debug!(%peer, %err, "connection to peer failed") + } + } + } + + fn on_download_completed(&mut self, kind: DownloadKind, result: Result<(), FailureAction>) { + // first remove the request + let info = self + .current_requests + .remove(&kind) + .expect("request was active"); + + // update the active requests for this peer + let ActiveRequestInfo { + intents, + peer, + mut remaining_retries, + .. + } = info; + + let peer_info = self + .peers + .get_mut(&peer) + .expect("peer exists in the mapping"); + peer_info.state = match &peer_info.state { + PeerState::Busy { active_requests } => { + match NonZeroUsize::new(active_requests.get() - 1) { + Some(active_requests) => PeerState::Busy { active_requests }, + None => { + // last request of the peer was this one + let drop_key = self.goodbye_peer_queue.insert(peer, IDLE_PEER_TIMEOUT); + PeerState::Idle { drop_key } + } + } + } + PeerState::Idle { .. } => unreachable!("peer was busy"), + }; + + let hash = *kind.hash(); + + match result { + Ok(()) => { + debug!(%peer, ?kind, "download completed"); + for sender in intents.into_values() { + let _ = sender.send(Ok(())); + } + } + Err(FailureAction::AbortRequest(reason)) => { + debug!(%peer, ?kind, %reason, "aborting request"); + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("request aborted"))); + } + } + Err(FailureAction::DropPeer(reason)) => { + debug!(%peer, ?kind, %reason, "peer will be dropped"); + if let Some(_connection) = peer_info.conn.take() { + // TODO(@divma): this will fail open streams, do we want this? + // connection.close(..) + } + } + Err(FailureAction::RetryLater(reason)) => { + // check if the download can be retried + if remaining_retries > 0 { + debug!(%peer, ?kind, %reason, "download attempt failed"); + remaining_retries -= 1; + let next_peer = self.get_best_candidate(kind.hash()); + self.schedule_request(kind, remaining_retries, next_peer, intents); + } else { + debug!(%peer, ?kind, %reason, "download failed"); + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("download ran out of attempts"))); + } + } + } + } + + if !self.is_needed(hash) { + self.providers.remove(hash) + } + } + + /// A scheduled request is ready to be processed. + /// + /// The peer that was initially selected is used if possible. Otherwise we try to get a new + /// peer + fn on_scheduled_request_ready(&mut self, kind: DownloadKind, info: PendingRequestInfo) { + let PendingRequestInfo { + intents, + mut remaining_retries, + next_peer, + .. + } = info; + + // first try with the peer that was initially assigned + if let Some((peer, conn)) = next_peer.and_then(|peer| { + self.get_peer_connection_for_download(&peer) + .map(|conn| (peer, conn)) + }) { + return self.start_download(kind, peer, conn, remaining_retries, intents); + } + + // we either didn't have a peer or the peer is busy or dialing. In any case try to get + // another peer + let next_peer = match self.get_best_candidate(kind.hash()) { + None => None, + Some(peer) => { + // optimistically check if the peer could do the request right away + match self.get_peer_connection_for_download(&peer) { + Some(conn) => { + return self.start_download(kind, peer, conn, remaining_retries, intents) + } + None => Some(peer), + } + } + }; + + // we tried to get a peer to perform this request but didn't get one, so now this attempt + // is failed + if remaining_retries > 0 { + remaining_retries -= 1; + self.schedule_request(kind, remaining_retries, next_peer, intents); + } else { + // request can't be retried + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("download ran out of attempts"))); + } + debug!(?kind, "download ran out of attempts") + } + } + + /// Start downloading from the given peer. + fn start_download( + &mut self, + kind: DownloadKind, + peer: PublicKey, + conn: D::Connection, + remaining_retries: u8, + intents: HashMap>, + ) { + debug!(%peer, ?kind, "starting download"); + let cancellation = CancellationToken::new(); + let info = ActiveRequestInfo { + intents, + remaining_retries, + cancellation, + peer, + }; + let cancellation = info.cancellation.clone(); + self.current_requests.insert(kind.clone(), info); + + let get = self.getter.get(kind.clone(), conn); + let fut = async move { + // NOTE: it's an open question if we should do timeouts at this point. Considerations from @Frando: + // > at this stage we do not know the size of the download, so the timeout would have + // > to be so large that it won't be useful for non-huge downloads. At the same time, + // > this means that a super slow peer would block a download from succeeding for a long + // > time, while faster peers could be readily available. + // As a conclusion, timeouts should be added only after downloads are known to be bounded + let res = tokio::select! { + _ = cancellation.cancelled() => Err(FailureAction::AbortRequest(anyhow::anyhow!("cancelled"))), + res = get => res + }; + + (kind, res) + }; + + self.in_progress_downloads.push(fut.boxed_local()); + } + + /// Schedule a request for later processing. + fn schedule_request( + &mut self, + kind: DownloadKind, + remaining_retries: u8, + next_peer: Option, + intents: HashMap>, + ) { + // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal + // number) is maxed at INITIAL_RETRY_COUNT + let delay = INITIAL_REQUEST_DELAY + * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1); + let delay_key = self.scheduled_request_queue.insert(kind.clone(), delay); + + let info = PendingRequestInfo { + intents, + remaining_retries, + delay_key, + next_peer, + }; + debug!(?kind, ?info, "request scheduled"); + self.scheduled_requests.insert(kind, info); + } + + /// Gets the [`Dialer::Connection`] for a peer if it's connected and has capacity for another + /// request. In this case, the count of active requests for the peer is incremented. + fn get_peer_connection_for_download(&mut self, peer: &PublicKey) -> Option { + let info = self.peers.get_mut(peer)?; + let connection = info.conn.as_ref()?; + // check if the peer can be sent another request + match &mut info.state { + PeerState::Busy { active_requests } => { + if !self + .concurrency_limits + .peer_at_request_capacity(active_requests.get()) + { + *active_requests = active_requests.saturating_add(1); + Some(connection.clone()) + } else { + None + } + } + PeerState::Idle { drop_key } => { + // peer is no longer idle + self.goodbye_peer_queue.remove(drop_key); + info.state = PeerState::Busy { + active_requests: NonZeroUsize::new(1).expect("clearly non zero"), + }; + Some(connection.clone()) + } + } + } + + /// Check if we have maxed our connection capacity. + fn at_connections_capacity(&self) -> bool { + self.concurrency_limits + .at_connections_capacity(self.connections_count()) + } + + /// Get the total number of connected and dialing peers. + fn connections_count(&self) -> usize { + let connected_peers = self + .peers + .values() + .filter(|info| info.conn.is_some()) + .count(); + let dialing_peers = self.dialer.pending_count(); + connected_peers + dialing_peers + } + + async fn shutdown(self) { + debug!("shutting down"); + // TODO(@divma): how to make sure the download futures end gracefully? + } +} + +/// Map of potential providers for a hash. +#[derive(Default, Debug)] +pub struct ProviderMap { + /// Candidates to download a hash. + candidates: HashMap>, +} + +struct ProviderIter<'a> { + inner: Option>, +} + +impl<'a> Iterator for ProviderIter<'a> { + type Item = &'a PublicKey; + + fn next(&mut self) -> Option { + self.inner.as_mut().and_then(|iter| iter.next()) + } +} + +impl ProviderMap { + /// Get candidates to download this hash. + fn get_candidates(&self, hash: &Hash) -> impl Iterator { + let inner = self.candidates.get(hash).map(|peer_set| peer_set.iter()); + ProviderIter { inner } + } + + /// Register peers for a hash. Should only be done for hashes we care to download. + fn add_peers(&mut self, hash: Hash, peers: &[PublicKey]) { + self.candidates.entry(hash).or_default().extend(peers) + } + + /// Signal the registry that this hash is no longer of interest. + fn remove(&mut self, hash: Hash) { + self.candidates.remove(&hash); + } +} + +impl Dialer for iroh_gossip::net::util::Dialer { + type Connection = quinn::Connection; + + fn queue_dial(&mut self, peer_id: PublicKey) { + self.queue_dial(peer_id, &iroh_bytes::protocol::ALPN) + } + + fn pending_count(&self) -> usize { + self.pending_count() + } + + fn is_pending(&self, peer: &PublicKey) -> bool { + self.is_pending(peer) + } +} diff --git a/iroh/src/downloader/get.rs b/iroh/src/downloader/get.rs new file mode 100644 index 0000000000..858102ebed --- /dev/null +++ b/iroh/src/downloader/get.rs @@ -0,0 +1,556 @@ +//! [`Getter`] implementation that performs requests over [`quinn::Connection`]s. + +use anyhow::Context; +use bao_tree::io::fsm::OutboardMut; +use futures::FutureExt; +use iroh_bytes::baomap::range_collections::RangeSet2; +use iroh_bytes::{ + baomap::{MapEntry, PartialMapEntry, Store}, + collection::CollectionParser, + get::{ + self, + fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, + Stats, + }, + protocol::{GetRequest, RangeSpecSeq}, + util::Hash, + IROH_BLOCK_SIZE, +}; +#[cfg(feature = "metrics")] +use iroh_metrics::{inc, inc_by}; +use tracing::trace; + +use crate::get::{get_missing_ranges_blob, get_missing_ranges_collection, BlobInfo}; +#[cfg(feature = "metrics")] +use crate::metrics::Metrics; +use crate::util::progress::ProgressSliceWriter2; + +use super::{DownloadKind, FailureAction, GetFut, Getter}; + +/// [`Getter`] implementation that performs requests over [`quinn::Connection`]s. +pub(crate) struct IoGetter { + pub store: S, + pub collection_parser: C, +} + +impl Getter for IoGetter { + type Connection = quinn::Connection; + + fn get(&mut self, kind: DownloadKind, conn: Self::Connection) -> GetFut { + let store = self.store.clone(); + let collection_parser = self.collection_parser.clone(); + let fut = async move { + let get = match kind { + DownloadKind::Blob { hash } => get(&store, &collection_parser, conn, hash, false), + DownloadKind::Collection { hash } => { + get(&store, &collection_parser, conn, hash, true) + } + }; + + let res = get.await; + match res { + Ok(_stats) => { + #[cfg(feature = "metrics")] + { + let Stats { + bytes_written, + bytes_read: _, + elapsed, + } = _stats; + + inc!(Metrics, downloads_success); + inc_by!(Metrics, download_bytes_total, bytes_written); + inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64); + } + Ok(()) + } + Err(e) => { + // record metrics according to the error + #[cfg(feature = "metrics")] + { + match &e { + FailureAction::RetryLater(_) => inc!(Metrics, downloads_notfound), + _ => inc!(Metrics, downloads_error), + } + } + Err(e) + } + } + }; + fut.boxed_local() + } +} + +impl From for FailureAction { + fn from(value: quinn::ConnectionError) -> Self { + // explicit match just to be sure we are taking everything into account + match value { + e @ quinn::ConnectionError::VersionMismatch => { + // > The peer doesn't implement any supported version + // unsupported version is likely a long time error, so this peer is not usable + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::TransportError(_) => { + // > The peer violated the QUIC specification as understood by this implementation + // bad peer we don't want to keep around + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::ConnectionClosed(_) => { + // > The peer's QUIC stack aborted the connection automatically + // peer might be disconnecting or otherwise unavailable, drop it + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::ApplicationClosed(_) => { + // > The peer closed the connection + // peer might be disconnecting or otherwise unavailable, drop it + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::Reset => { + // > The peer is unable to continue processing this connection, usually due to having restarted + FailureAction::RetryLater(e.into()) + } + e @ quinn::ConnectionError::TimedOut => { + // > Communication with the peer has lapsed for longer than the negotiated idle timeout + FailureAction::RetryLater(e.into()) + } + e @ quinn::ConnectionError::LocallyClosed => { + // > The local application closed the connection + // TODO(@divma): don't see how this is reachable but let's just not use the peer + FailureAction::DropPeer(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: quinn::ReadError) -> Self { + match value { + e @ quinn::ReadError::Reset(_) => FailureAction::RetryLater(e.into()), + quinn::ReadError::ConnectionLost(conn_error) => conn_error.into(), + quinn::ReadError::UnknownStream + | quinn::ReadError::IllegalOrderedRead + | quinn::ReadError::ZeroRttRejected => { + // all these errors indicate the peer is not usable at this moment + FailureAction::DropPeer(value.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: quinn::WriteError) -> Self { + match value { + e @ quinn::WriteError::Stopped(_) => FailureAction::RetryLater(e.into()), + quinn::WriteError::ConnectionLost(conn_error) => conn_error.into(), + quinn::WriteError::UnknownStream | quinn::WriteError::ZeroRttRejected => { + // all these errors indicate the peer is not usable at this moment + FailureAction::DropPeer(value.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::ConnectedNextError) -> Self { + use iroh_bytes::get::fsm::ConnectedNextError::*; + match value { + e @ PostcardSer(_) => { + // serialization errors indicate something wrong with the request itself + FailureAction::AbortRequest(e.into()) + } + e @ RequestTooBig => { + // request will never be sent, drop it + FailureAction::AbortRequest(e.into()) + } + Write(e) => e.into(), + Read(e) => e.into(), + e @ CustomRequestTooBig => { + // something wrong with the request itself + FailureAction::AbortRequest(e.into()) + } + e @ Eof => { + // TODO(@divma): unsure about this based on docs + FailureAction::RetryLater(e.into()) + } + e @ PostcardDe(_) => { + // serialization errors can't be recovered + FailureAction::AbortRequest(e.into()) + } + e @ Io(_) => { + // io errors are likely recoverable + FailureAction::RetryLater(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::AtBlobHeaderNextError) -> Self { + use iroh_bytes::get::fsm::AtBlobHeaderNextError::*; + match value { + e @ NotFound => { + // > This indicates that the provider does not have the requested data. + // peer might have the data later, simply retry it + FailureAction::RetryLater(e.into()) + } + e @ InvalidQueryRange => { + // we are doing something wrong with this request, drop it + FailureAction::AbortRequest(e.into()) + } + Read(e) => e.into(), + e @ Io(_) => { + // io errors are likely recoverable + FailureAction::RetryLater(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::DecodeError) -> Self { + use get::fsm::DecodeError::*; + + match value { + e @ NotFound => FailureAction::RetryLater(e.into()), + e @ ParentNotFound(_) => FailureAction::RetryLater(e.into()), + e @ LeafNotFound(_) => FailureAction::RetryLater(e.into()), + e @ ParentHashMismatch(_) => { + // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong + // request? + FailureAction::AbortRequest(e.into()) + } + e @ LeafHashMismatch(_) => { + // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong + // request? + FailureAction::AbortRequest(e.into()) + } + e @ InvalidQueryRange => FailureAction::AbortRequest(e.into()), + Read(e) => e.into(), + Io(e) => e.into(), + } + } +} + +impl From for FailureAction { + fn from(value: std::io::Error) -> Self { + // generally consider io errors recoverable + // we might want to revisit this at some point + FailureAction::RetryLater(value.into()) + } +} + +/// Get a blob or collection +pub async fn get( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + hash: Hash, + recursive: bool, +) -> Result { + let res = if recursive { + get_collection(db, collection_parser, conn, &hash).await + } else { + get_blob(db, conn, &hash).await + }; + if let Err(e) = res.as_ref() { + tracing::error!("get failed: {e:?}"); + } + res +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +pub async fn get_blob( + db: &D, + conn: quinn::Connection, + hash: &Hash, +) -> Result { + let end = if let Some(entry) = db.get_partial(hash) { + trace!("got partial data for {}", hash,); + + let required_ranges = get_missing_ranges_blob::(&entry) + .await + .ok() + .unwrap_or_else(RangeSet2::all); + let request = GetRequest::new(*hash, RangeSpecSeq::from_ranges([required_ranges])); + // full request + let request = get::fsm::start(conn, iroh_bytes::protocol::Request::Get(request)); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected `StartRoot` in single blob request" + ))); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + + get_blob_inner_partial(db, header, entry).await? + } else { + // full request + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::single(*hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected `StartRoot` in single blob request" + ))); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + get_blob_inner(db, header).await? + }; + + // we have requested a single hash, so we must be at closing + let EndBlobNext::Closing(end) = end.next() else { + // TODO(@divma): I think this is a codign error and not a peer error + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent extra data in single blob request" + ))); + }; + // this closes the bidi stream. Do something with the stats? + let stats = end.next().await?; + Ok(stats) +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +async fn get_blob_inner( + db: &D, + header: AtBlobHeader, +) -> Result { + use iroh_io::AsyncSliceWriter; + + let hash = header.hash(); + // read the size + let (content, size) = header.next().await?; + // create the temp file pair + let entry = db.get_or_create_partial(hash, size)?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of: Option = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + let on_write = move |_offset: u64, _length: usize| Ok(()); + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + // TODO(@divma): what does this failure mean + // sync the data file + pw.sync().await?; + // sync the outboard file, if we wrote one + if let Some(mut of) = of { + of.sync().await?; + } + db.insert_complete(entry).await?; + Ok(end) +} + +fn needs_outboard(size: u64) -> bool { + size > (IROH_BLOCK_SIZE.bytes() as u64) +} + +/// Get a blob that was requested partially. +/// +/// We get passed the data and outboard ids. Partial downloads are only done +/// for large blobs where the outboard is present. +async fn get_blob_inner_partial( + db: &D, + header: AtBlobHeader, + entry: D::PartialEntry, +) -> Result { + // TODO: the data we get is validated at this point, but we need to check + // that it actually contains the requested ranges. Or DO WE? + use iroh_io::AsyncSliceWriter; + + // read the size + let (content, size) = header.next().await?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + let on_write = move |_offset: u64, _length: usize| Ok(()); + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + + // TODO(@divma): what does this failure mean + // sync the data file + pw.sync().await?; + // sync the outboard file + if let Some(mut of) = of { + of.sync().await?; + } + // actually store the data. it is up to the db to decide if it wants to + // rename the files or not. + db.insert_complete(entry).await?; + Ok(end) +} + +/// Get a collection +pub async fn get_collection( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + root_hash: &Hash, +) -> Result { + use tracing::info as log; + let finishing = if let Some(entry) = db.get(root_hash) { + log!("already got collection - doing partial download"); + // got the collection + let reader = entry.data_reader().await?; + let (mut collection, _) = collection_parser.parse(0, reader).await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that can't be parsed as collection : {e}" + )) + })?; + let mut children: Vec = vec![]; + while let Some(hash) = collection.next().await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "received collection data can't be iterated: {e}" + )) + })? { + children.push(hash); + } + let missing_info = get_missing_ranges_collection(db, &children).await?; + if missing_info.iter().all(|x| matches!(x, BlobInfo::Complete)) { + log!("nothing to do"); + return Ok(Stats::default()); + } + let missing_iter = std::iter::once(RangeSet2::empty()) + .chain(missing_info.iter().map(|x| x.missing_chunks())) + .collect::>(); + log!("requesting chunks {:?}", missing_iter); + let request = GetRequest::new(*root_hash, RangeSpecSeq::from_ranges(missing_iter)); + let request = get::fsm::start(conn, request.into()); + // create a new bidi stream + let connected = request.next().await?; + log!("connected"); + // we have not requested the root, so this must be StartChild + let ConnectedNext::StartChild(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that does not match requested info" + ))); + }; + let mut next = EndBlobNext::MoreChildren(start); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = usize::try_from(start.child_offset()) + .context("child offset too large") + .map_err(|_| { + FailureAction::AbortRequest(anyhow::anyhow!( + "requested offsets surpasses platform's usize" + )) + })?; + let (child_hash, info) = + match (children.get(child_offset), missing_info.get(child_offset)) { + (Some(blob), Some(info)) => (*blob, info), + _ => break start.finish(), + }; + tracing::info!( + "requesting child {} {:?}", + child_hash, + info.missing_chunks() + ); + let header = start.next(child_hash); + let end_blob = match info { + BlobInfo::Missing => get_blob_inner(db, header).await?, + BlobInfo::Partial { entry, .. } => { + get_blob_inner_partial(db, header, entry.clone()).await? + } + BlobInfo::Complete => { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data we did't request" + ))) + } + }; + next = end_blob.next(); + } + } else { + tracing::info!("don't have collection - doing full download"); + // don't have the collection, so probably got nothing + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::all(*root_hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected StartRoot" + ))); + }; + // move to the header + let header = start.next(); + // read the blob and add it to the database + let end_root = get_blob_inner(db, header).await?; + // read the collection fully for now + let entry = db.get(root_hash).context("just downloaded").map_err(|_| { + FailureAction::RetryLater(anyhow::anyhow!("data just downloaded was not found")) + })?; + let reader = entry.data_reader().await?; + let (mut collection, _) = collection_parser.parse(0, reader).await.map_err(|_| { + FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that can't be parsed as collection" + )) + })?; + let mut children = vec![]; + while let Some(hash) = collection.next().await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "received collection data can't be iterated: {e}" + )) + })? { + children.push(hash); + } + let mut next = end_root.next(); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = usize::try_from(start.child_offset()) + .context("child offset too large") + .map_err(|_| { + FailureAction::AbortRequest(anyhow::anyhow!( + "requested offsets surpasses platform's usize" + )) + })?; + let child_hash = match children.get(child_offset) { + Some(blob) => *blob, + None => break start.finish(), + }; + let header = start.next(child_hash); + let end_blob = get_blob_inner(db, header).await?; + next = end_blob.next(); + } + }; + // this closes the bidi stream. Do something with the stats? + let stats = finishing.next().await?; + Ok(stats) +} diff --git a/iroh/src/downloader/invariants.rs b/iroh/src/downloader/invariants.rs new file mode 100644 index 0000000000..24383a4c15 --- /dev/null +++ b/iroh/src/downloader/invariants.rs @@ -0,0 +1,99 @@ +//! Invariants for the service. + +#![cfg(any(test, debug_assertions))] + +use super::*; + +/// invariants for the service. +impl, D: Dialer> Service { + /// Checks the various invariants the service must maintain + #[track_caller] + pub(in crate::downloader) fn check_invariants(&self) { + self.check_active_request_count(); + self.check_scheduled_requests_consistency(); + self.check_idle_peer_consistency(); + self.chech_concurrency_limits(); + } + + /// Checks concurrency limits are maintained. + #[track_caller] + fn chech_concurrency_limits(&self) { + let ConcurrencyLimits { + max_concurrent_requests, + max_concurrent_requests_per_peer, + max_open_connections, + } = &self.concurrency_limits; + + // check the total number of active requests to ensure it stays within the limit + assert!( + self.in_progress_downloads.len() <= *max_concurrent_requests, + "max_concurrent_requests exceeded" + ); + + // check that the open and dialing peers don't exceed the connection capacity + assert!( + self.connections_count() <= *max_open_connections, + "max_open_connections exceeded" + ); + + // check the active requests per peer don't exceed the limit + for (peer, info) in self.peers.iter() { + assert!( + info.active_requests() <= *max_concurrent_requests_per_peer, + "max_concurrent_requests_per_peer exceeded for {peer}" + ) + } + } + + /// Checks that the count of active requests per peer is consistent with the active requests, + /// and that active request are consistent with download futures + #[track_caller] + fn check_active_request_count(&self) { + // check that the count of futures we are polling for downloads is consistent with the + // number of requests + assert_eq!( + self.in_progress_downloads.len(), + self.current_requests.len(), + "current_requests and in_progress_downloads are out of sync" + ); + // check that the count of requests per peer matches the number of requests that have that + // peer as active + let mut real_count: HashMap = HashMap::with_capacity(self.peers.len()); + for req_info in self.current_requests.values() { + // nothing like some classic word count + *real_count.entry(req_info.peer).or_default() += 1; + } + for (peer, info) in self.peers.iter() { + assert_eq!( + info.active_requests(), + real_count.get(peer).copied().unwrap_or_default(), + "missmatched count of active requests for {peer}" + ) + } + } + + /// Checks that the scheduled requests match the queue that handles their delays. + #[track_caller] + fn check_scheduled_requests_consistency(&self) { + assert_eq!( + self.scheduled_requests.len(), + self.scheduled_request_queue.len(), + "scheduled_request_queue and scheduled_requests are out of sync" + ); + } + + /// Check that peers queued to be disconnected are consistent with peers considered idle. + #[track_caller] + fn check_idle_peer_consistency(&self) { + let idle_peers = self + .peers + .values() + .filter(|info| info.active_requests() == 0) + .count(); + assert_eq!( + self.goodbye_peer_queue.len(), + idle_peers, + "inconsistent count of idle peers" + ); + } +} diff --git a/iroh/src/downloader/test.rs b/iroh/src/downloader/test.rs new file mode 100644 index 0000000000..80454226fc --- /dev/null +++ b/iroh/src/downloader/test.rs @@ -0,0 +1,199 @@ +#![cfg(test)] +use std::time::Duration; + +use iroh_net::key::SecretKey; + +use super::*; + +mod dialer; +mod getter; + +impl Downloader { + fn spawn_for_test( + dialer: dialer::TestingDialer, + getter: getter::TestingGetter, + concurrency_limits: ConcurrencyLimits, + ) -> Self { + let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY); + + iroh_bytes::util::runtime::Handle::from_current(1) + .unwrap() + .local_pool() + .spawn_pinned(move || async move { + // we want to see the logs of the service + let _guard = iroh_test::logging::setup(); + + let service = Service::new(getter, dialer, concurrency_limits, msg_rx); + service.run().await + }); + + Downloader { next_id: 0, msg_tx } + } +} + +/// Tests that receiving a download request and performing it doesn't explode. +#[tokio::test] +async fn smoke_test() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send a request and make sure the peer is requested the corresponding download + let peer = SecretKey::generate().public(); + let kind = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let handle = downloader.queue(kind.clone(), vec![peer]).await; + // wait for the download result to be reported + handle.await.expect("should report success"); + // verify that the peer was dialed + dialer.assert_history(&[peer]); + // verify that the request was sent + getter.assert_history(&[(kind, peer)]); +} + +/// Tests that multiple intents produce a single request. +#[tokio::test] +async fn deduplication() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure the intents are received before completion + getter.set_request_duration(Duration::from_secs(1)); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + let peer = SecretKey::generate().public(); + let kind = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + let h = downloader.queue(kind.clone(), vec![peer]).await; + handles.push(h); + } + assert!( + futures::future::join_all(handles) + .await + .into_iter() + .all(|r| r.is_ok()), + "all downloads should succeed" + ); + // verify that the request was sent just once + getter.assert_history(&[(kind, peer)]); +} + +/// Tests that the request is cancelled only when all intents are cancelled. +#[tokio::test] +async fn cancellation() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure cancellations are received on time + getter.set_request_duration(Duration::from_millis(500)); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + let peer = SecretKey::generate().public(); + let kind_1 = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let handle_a = downloader.queue(kind_1.clone(), vec![peer]).await; + let handle_b = downloader.queue(kind_1.clone(), vec![peer]).await; + downloader.cancel(handle_a).await; + + // create a request with two intents and cancel them both + let kind_2 = DownloadKind::Blob { + hash: Hash::new([1u8; 32]), + }; + let handle_c = downloader.queue(kind_2.clone(), vec![peer]).await; + let handle_d = downloader.queue(kind_2.clone(), vec![peer]).await; + downloader.cancel(handle_c).await; + downloader.cancel(handle_d).await; + + // wait for the download result to be reported, a was cancelled but b should continue + handle_b.await.expect("should report success"); + // verify that the request was sent just once, and that the second request was never sent + getter.assert_history(&[(kind_1, peer)]); +} + +/// Test that when the downloader receives a flood of requests, they are scheduled so that the +/// maximum number of concurrent requests is not exceed. +/// NOTE: This is internally tested by [`Service::check_invariants`]. +#[tokio::test] +async fn max_concurrent_requests() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure concurreny limits are hit + getter.set_request_duration(Duration::from_millis(500)); + // set the concurreny limit very low to ensure it's hit + let concurrency_limits = ConcurrencyLimits { + max_concurrent_requests: 2, + ..Default::default() + }; + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send the downloads + let peer = SecretKey::generate().public(); + let mut handles = Vec::with_capacity(5); + let mut expected_history = Vec::with_capacity(5); + for i in 0..5 { + let kind = DownloadKind::Blob { + hash: Hash::new([i; 32]), + }; + let h = downloader.queue(kind.clone(), vec![peer]).await; + expected_history.push((kind, peer)); + handles.push(h); + } + + assert!( + futures::future::join_all(handles) + .await + .into_iter() + .all(|r| r.is_ok()), + "all downloads should succeed" + ); + + // verify that the request was sent just once + getter.assert_history(&expected_history); +} + +/// Test that when the downloader receives a flood of requests, with only one peer to handle them, +/// the maximum number of requests per peer is still respected. +/// NOTE: This is internally tested by [`Service::check_invariants`]. +#[tokio::test] +async fn max_concurrent_requests_per_peer() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure concurreny limits are hit + getter.set_request_duration(Duration::from_millis(500)); + // set the concurreny limit very low to ensure it's hit + let concurrency_limits = ConcurrencyLimits { + max_concurrent_requests_per_peer: 1, + max_concurrent_requests: 10000, // all requests can be performed at the same time + ..Default::default() + }; + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send the downloads + let peer = SecretKey::generate().public(); + let mut handles = Vec::with_capacity(5); + for i in 0..5 { + let kind = DownloadKind::Blob { + hash: Hash::new([i; 32]), + }; + let h = downloader.queue(kind.clone(), vec![peer]).await; + handles.push(h); + } + + futures::future::join_all(handles).await; +} diff --git a/iroh/src/downloader/test/dialer.rs b/iroh/src/downloader/test/dialer.rs new file mode 100644 index 0000000000..7f40dd4d01 --- /dev/null +++ b/iroh/src/downloader/test/dialer.rs @@ -0,0 +1,88 @@ +//! Implementation of [`super::Dialer`] used for testing. + +use std::{ + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use parking_lot::RwLock; + +use super::*; + +/// Dialer for testing that keeps track of the dialing history. +#[derive(Default, Clone)] +pub(super) struct TestingDialer(Arc>); + +struct TestingDialerInner { + /// Peers that are being dialed. + dialing: HashSet, + /// Queue of dials. + dial_futs: delay_queue::DelayQueue, + /// History of attempted dials. + dial_history: Vec, + /// How long does a dial last. + dial_duration: Duration, + /// Fn deciding if a dial is successful. + dial_outcome: Box bool>, +} + +impl Default for TestingDialerInner { + fn default() -> Self { + TestingDialerInner { + dialing: HashSet::default(), + dial_futs: delay_queue::DelayQueue::default(), + dial_history: Vec::default(), + dial_duration: Duration::ZERO, + dial_outcome: Box::new(|_| true), + } + } +} + +impl Dialer for TestingDialer { + type Connection = PublicKey; + + fn queue_dial(&mut self, peer_id: PublicKey) { + let mut inner = self.0.write(); + inner.dial_history.push(peer_id); + // for now assume every dial works + let dial_duration = inner.dial_duration; + if inner.dialing.insert(peer_id) { + inner.dial_futs.insert(peer_id, dial_duration); + } + } + + fn pending_count(&self) -> usize { + self.0.read().dialing.len() + } + + fn is_pending(&self, peer: &PublicKey) -> bool { + self.0.read().dialing.contains(peer) + } +} + +impl futures::Stream for TestingDialer { + type Item = (PublicKey, anyhow::Result); + + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.0.write(); + match inner.dial_futs.poll_expired(cx) { + Poll::Ready(Some(expired)) => { + let peer = expired.into_inner(); + let report_ok = (inner.dial_outcome)(&peer); + let result = report_ok + .then_some(peer) + .ok_or_else(|| anyhow::anyhow!("dialing test set to fail")); + Poll::Ready(Some((peer, result))) + } + _ => Poll::Pending, + } + } +} + +impl TestingDialer { + #[track_caller] + pub(super) fn assert_history(&self, history: &[PublicKey]) { + assert_eq!(self.0.read().dial_history, history) + } +} diff --git a/iroh/src/downloader/test/getter.rs b/iroh/src/downloader/test/getter.rs new file mode 100644 index 0000000000..7f546b2aa9 --- /dev/null +++ b/iroh/src/downloader/test/getter.rs @@ -0,0 +1,46 @@ +//! Implementation of [`super::Getter`] used for testing. + +use std::{sync::Arc, time::Duration}; + +use parking_lot::RwLock; + +use super::*; + +#[derive(Default, Clone)] +pub(super) struct TestingGetter(Arc>); + +#[derive(Default)] +struct TestingGetterInner { + /// How long requests take. + request_duration: Duration, + /// History of requests performed by the [`Getter`] and if they were successful. + request_history: Vec<(DownloadKind, PublicKey)>, +} + +impl Getter for TestingGetter { + // since for testing we don't need a real connection, just keep track of what peer is the + // request being sent to + type Connection = PublicKey; + + fn get(&mut self, kind: DownloadKind, peer: PublicKey) -> GetFut { + let mut inner = self.0.write(); + inner.request_history.push((kind, peer)); + let request_duration = inner.request_duration; + async move { + tokio::time::sleep(request_duration).await; + Ok(()) + } + .boxed_local() + } +} + +impl TestingGetter { + pub(super) fn set_request_duration(&self, request_duration: Duration) { + self.0.write().request_duration = request_duration; + } + /// Verify that the request history is as expected + #[track_caller] + pub(super) fn assert_history(&self, history: &[(DownloadKind, PublicKey)]) { + assert_eq!(self.0.read().request_history, history); + } +} diff --git a/iroh/src/get.rs b/iroh/src/get.rs index 38857c33ab..c8a3fa650f 100644 --- a/iroh/src/get.rs +++ b/iroh/src/get.rs @@ -105,7 +105,7 @@ pub async fn get_blob( anyhow::Ok(stats) } -async fn get_missing_ranges_blob( +pub(crate) async fn get_missing_ranges_blob( entry: &D::PartialEntry, ) -> anyhow::Result> { use tracing::trace as log; @@ -249,7 +249,7 @@ async fn get_blob_inner_partial( } /// Given a collection of hashes, figure out what is missing -async fn get_missing_ranges_collection( +pub(crate) async fn get_missing_ranges_collection( db: &D, collection: &Vec, ) -> io::Result>> { @@ -408,7 +408,7 @@ pub async fn get_collection( } #[derive(Debug, Clone)] -enum BlobInfo { +pub(crate) enum BlobInfo { // we have the blob completely Complete, // we have the blob partially @@ -421,7 +421,7 @@ enum BlobInfo { } impl BlobInfo { - fn missing_chunks(&self) -> RangeSet2 { + pub fn missing_chunks(&self) -> RangeSet2 { match self { BlobInfo::Complete => RangeSet2::empty(), BlobInfo::Partial { missing_chunks, .. } => missing_chunks.clone(), diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 9f12a9fecf..1697d22598 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -10,7 +10,7 @@ pub mod client; #[cfg(feature = "iroh-collection")] pub mod collection; pub mod dial; -pub mod download; +pub mod downloader; pub mod get; pub mod node; pub mod rpc_protocol; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 08b7910f73..04c6e8362c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -56,7 +56,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; use crate::dial::Ticket; -use crate::download::Downloader; +use crate::downloader::Downloader; use crate::rpc_protocol::{ BytesGetRequest, BytesGetResponse, ConnectionInfoRequest, ConnectionInfoResponse, ConnectionsRequest, ConnectionsResponse, ListBlobsRequest, ListBlobsResponse, @@ -374,7 +374,13 @@ where gossip_cell.set(gossip.clone()).unwrap(); // spawn the sync engine - let downloader = Downloader::new(rt.clone(), endpoint.clone(), self.db.clone()); + let downloader = Downloader::new( + self.db.clone(), + self.collection_parser.clone(), + endpoint.clone(), + rt.clone(), + ) + .await; let sync = SyncEngine::spawn( rt.clone(), endpoint.clone(), diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 8646ce6f82..ac99004012 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -14,7 +14,7 @@ use iroh_sync::{ }; use parking_lot::RwLock; -use crate::download::Downloader; +use crate::downloader::Downloader; mod live; pub mod rpc; diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 617ac1d99b..7fbc7e9265 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -6,7 +6,7 @@ use std::{ sync::{atomic::AtomicU64, Arc}, }; -use crate::download::Downloader; +use crate::downloader::{DownloadKind, Downloader}; use anyhow::{anyhow, bail, Result}; use futures::{ future::{BoxFuture, Shared}, @@ -673,11 +673,16 @@ impl Actor { // content. let entry_status = self.bao_store.contains(&hash); if matches!(entry_status, EntryStatus::NotFound) { - self.downloader.push(hash, vec![from]).await; - let fut = self.downloader.finished(&hash).await; - let fut = fut - .map(move |res| res.map(move |(hash, _len)| (topic, hash))) - .boxed(); + let handle = self + .downloader + .queue(DownloadKind::Blob { hash }, vec![from]) + .await; + let fut = async move { + // NOTE: this ignores the result for now, simply keeping the option + let res = handle.await.ok(); + res.map(|_| (topic, hash)) + } + .boxed(); self.pending_downloads.push(fut); }