From c21728346378d6aa780e60001c0858061d2f55df Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Wed, 13 Sep 2023 09:19:19 -0500 Subject: [PATCH] feat(iroh): downloader (#1420) ## Description Adds the `Downloader` as specified in #1334 plus some backchannel convos Features include: - Support collections - Add delays to downloads - Add download retries with an incremental backoff - Keeping peers for a bit longer than necessary in hopes they will be useful again - Having the concept of intents and deduplicating downloads efforts - Cancelling download intents - Limiting how many concurrent requests are done in total - Limiting how many concurrent requests are done per peer - Limiting the number of open connections in total - Basic error management in the form of deciding whether a peer should be dropped, the request should be dropped, or if the request should be retried ## Notes & open questions ### TODOs - A remaining TODO in the code is whether something special should be done when dropping quic connections - Should downloads have a timeout? - ~I know I've said this a hundred times with a hundred different things but would love to test this as well under stress scenarios and a large number of peers. don't hate me~ In reality after abstracting away all the IO most scenarios can be simulated easily. What would remain for a _much_ later time when the need and opportunity for real case testing scenario arises is to tune the concurrency parameters ### Future work #### Downloading Ranges There was the requirement of downloading a Hash, a range of a Hash, a collection and (not mentioned but potentially implied) ranges of collections. There is no support for ranges right now because of the great duplication of the `get` code in order to take advantage of proper errors added in #1362. In principle, adding ranges should be really easy. This is because it's an extension of the `DownloadKind` and would simply need calculating the missing ranges not based on the difference between what we have and the whole range, but the given range. I would prefer to find a way to deduplicate the get code before doing this extension. Also, as far as I can tell, there is no need for this yet. #### Prioritizing candidates per role: `Provider` and `Candidate` A nice extension, as discussed at some point, is to differentiate candidates we know have the data, from those that _might_ have the data. This has added benefit that when a peer is available to perform another download under the concurrency limits, a hash we know they have could be downloaded right away instead of waiting for the delay. At this point making this doesn't make sense because we will likely attempt a download before the peer has retrieved the data themselves. To implement this, we would need to add the notifications of fully downloaded hashes as available into gossip first. #### Leveraging the info from gossip When declaring that a hash `X` should be downloaded, it's also an option to query gossip for peers that are subscribed to the topic to which `X` belongs to use them as candidates. This could be done connecting the `ProviderMap` to `gossip`. For now I don't see the need to do this. ### Open questions about Future work - In line with the described work from above, the registry only allows querying for peer candidates to a hash since that's as good as it gets in terms of what we know from a remote right now. It's not clear to me if we would want to change this to have better availability information with #1413 in progress. - More future work: downloading a large data set/blob from multiple peers would most likely require us to do a three step process: 1. understanding the data layout/size. 2. splitting the download. 3. actually performing the separate downloads. Generally curious how this will end. My question here is whether we should do this for every download, or just on data that we expect to be big. Is there any way to obtain such hint without relying on a query every single time? ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --- Cargo.lock | 93 +-- iroh-bytes/src/protocol/range_spec.rs | 4 +- iroh-gossip/src/net.rs | 2 +- iroh-gossip/src/net/util.rs | 26 +- iroh-sync/src/sync.rs | 2 +- iroh/Cargo.toml | 2 +- iroh/examples/sync.rs | 7 +- iroh/src/download.rs | 386 ---------- iroh/src/downloader.rs | 989 ++++++++++++++++++++++++++ iroh/src/downloader/get.rs | 556 +++++++++++++++ iroh/src/downloader/invariants.rs | 99 +++ iroh/src/downloader/test.rs | 199 ++++++ iroh/src/downloader/test/dialer.rs | 88 +++ iroh/src/downloader/test/getter.rs | 46 ++ iroh/src/get.rs | 8 +- iroh/src/lib.rs | 2 +- iroh/src/node.rs | 10 +- iroh/src/sync_engine.rs | 2 +- iroh/src/sync_engine/live.rs | 17 +- 19 files changed, 2082 insertions(+), 456 deletions(-) delete mode 100644 iroh/src/download.rs create mode 100644 iroh/src/downloader.rs create mode 100644 iroh/src/downloader/get.rs create mode 100644 iroh/src/downloader/invariants.rs create mode 100644 iroh/src/downloader/test.rs create mode 100644 iroh/src/downloader/test/dialer.rs create mode 100644 iroh/src/downloader/test/getter.rs diff --git a/Cargo.lock b/Cargo.lock index f41ced4d60..167aef8810 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,7 +191,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -514,7 +514,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -801,9 +801,9 @@ dependencies = [ [[package]] name = "crypto-bigint" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4c2f4e1afd912bc40bfd6fed5d9dc1f288e0ba01bfcc835cc5bc3eb13efe15" +checksum = "740fe28e594155f10cfc383984cbefd529d7396050557148f79cb0f621204124" dependencies = [ "generic-array", "rand_core", @@ -879,7 +879,7 @@ checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -903,7 +903,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -914,7 +914,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1006,7 +1006,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1035,7 +1035,7 @@ checksum = "df541e0e2a8069352be228ce4b85a1da6f59bfd325e56f57e4b241babbc3f832" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "unicode-xid", ] @@ -1107,7 +1107,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1258,7 +1258,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1436,7 +1436,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2360,9 +2360,9 @@ checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1" [[package]] name = "memchr" -version = "2.6.2" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" @@ -2689,7 +2689,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2700,9 +2700,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "memchr", ] @@ -2788,7 +2788,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2912,7 +2912,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2943,7 +2943,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -3259,7 +3259,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -3560,18 +3560,18 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] name = "regex" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.7", + "regex-automata 0.3.8", "regex-syntax 0.7.5", ] @@ -3586,9 +3586,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -3988,7 +3988,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4059,7 +4059,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4311,7 +4311,7 @@ dependencies = [ "proc-macro2", "quote", "struct_iterable_internal", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4329,7 +4329,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4340,7 +4340,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4419,9 +4419,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" dependencies = [ "proc-macro2", "quote", @@ -4514,7 +4514,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4533,22 +4533,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4641,7 +4641,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -4719,6 +4719,7 @@ dependencies = [ "futures-util", "hashbrown 0.12.3", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -4794,7 +4795,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -5075,7 +5076,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-shared", ] @@ -5109,7 +5110,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5417,9 +5418,9 @@ dependencies = [ [[package]] name = "xml-rs" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47430998a7b5d499ccee752b41567bc3afc57e1327dc855b1a2aa44ce29b5fa1" +checksum = "1eee6bf5926be7cf998d7381a9a23d833fd493f6a8034658a9505a4dc4b20444" [[package]] name = "xmltree" diff --git a/iroh-bytes/src/protocol/range_spec.rs b/iroh-bytes/src/protocol/range_spec.rs index 661858425a..2409f23f70 100644 --- a/iroh-bytes/src/protocol/range_spec.rs +++ b/iroh-bytes/src/protocol/range_spec.rs @@ -39,7 +39,7 @@ use smallvec::{smallvec, SmallVec}; /// /// This is a SmallVec so we can avoid allocations for the very common case of a single /// chunk range. -#[derive(Deserialize, Serialize, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Hash)] #[repr(transparent)] pub struct RangeSpec(SmallVec<[u64; 2]>); @@ -152,7 +152,7 @@ impl fmt::Debug for RangeSpec { /// /// This is a smallvec so that we can avoid allocations in the common case of a single child /// range. -#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, Hash)] #[repr(transparent)] pub struct RangeSpecSeq(SmallVec<[(u64, RangeSpec); 2]>); diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index a8035ed120..f4e91f41c6 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -374,7 +374,7 @@ impl Actor { let peer_data = postcard::to_stdvec(&info)?; self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?; } - (peer_id, res) = self.dialer.next() => { + (peer_id, res) = self.dialer.next_conn() => { match res { Ok(conn) => { debug!(?me, peer = ?peer_id, "dial successfull"); diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index ff9d74bcfd..2f4dca4d9f 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -81,16 +81,17 @@ pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result, pending_peers: HashMap, } + impl Dialer { /// Create a new dialer for a [`MagicEndpoint`] pub fn new(endpoint: MagicEndpoint) -> Self { @@ -137,7 +138,7 @@ impl Dialer { } /// Wait for the next dial operation to complete - pub async fn next(&mut self) -> (PublicKey, anyhow::Result) { + pub async fn next_conn(&mut self) -> (PublicKey, anyhow::Result) { match self.pending_peers.is_empty() { false => { let (peer_id, res) = self.pending.next().await.unwrap(); @@ -147,6 +148,25 @@ impl Dialer { true => futures::future::pending().await, } } + + /// Number of pending connections to be opened. + pub fn pending_count(&self) -> usize { + self.pending_peers.len() + } +} + +impl futures::Stream for Dialer { + type Item = (PublicKey, anyhow::Result); + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.pending.poll_next_unpin(cx) { + std::task::Poll::Ready(res) if res.is_some() => std::task::Poll::Ready(res), + _ => std::task::Poll::Pending, + } + } } /// A [`TimerMap`] with an async method to wait for the next timer expiration. diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index 433de6a755..8a5ea22507 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -283,7 +283,7 @@ fn validate_entry + PublicKeyStore>( // If an existing entry exists, make sure it's older than the new entry. let existing = store.get(entry.id()); if let Ok(Some(existing)) = existing { - if existing.timestamp() > entry.timestamp() { + if existing.timestamp() >= entry.timestamp() { return Err(ValidationFailure::OlderThanExisting); } } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index bff8805f06..49248a4130 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -42,7 +42,7 @@ serde = { version = "1", features = ["derive"] } thiserror = "1" tokio = { version = "1", features = ["io-util", "rt"] } tokio-stream = "0.1" -tokio-util = { version = "0.7", features = ["codec", "io-util", "io"] } +tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } tracing = "0.1" walkdir = "2" diff --git a/iroh/examples/sync.rs b/iroh/examples/sync.rs index 13196330a7..9a86995b72 100644 --- a/iroh/examples/sync.rs +++ b/iroh/examples/sync.rs @@ -18,7 +18,7 @@ use clap::{CommandFactory, FromArgMatches, Parser}; use futures::StreamExt; use indicatif::HumanBytes; use iroh::{ - download::Downloader, + downloader::Downloader, sync_engine::{LiveEvent, PeerSource, SyncEngine, SYNC_ALPN}, }; use iroh_bytes::util::runtime; @@ -230,8 +230,11 @@ async fn run(args: Args) -> anyhow::Result<()> { std::fs::create_dir_all(&blob_path)?; let db = iroh::baomap::flat::Store::load(&blob_path, &blob_path, &rt).await?; + let collection_parser = iroh::collection::IrohCollectionParser; + // create the live syncer - let downloader = Downloader::new(rt.clone(), endpoint.clone(), db.clone()); + let downloader = + Downloader::new(db.clone(), collection_parser, endpoint.clone(), rt.clone()).await; let live_sync = SyncEngine::spawn( rt.clone(), endpoint.clone(), diff --git a/iroh/src/download.rs b/iroh/src/download.rs deleted file mode 100644 index d7acb134b1..0000000000 --- a/iroh/src/download.rs +++ /dev/null @@ -1,386 +0,0 @@ -//! Download queue - -#[cfg(feature = "metrics")] -use std::time::Instant; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; - -use anyhow::anyhow; -use futures::{ - future::{BoxFuture, LocalBoxFuture, Shared}, - stream::FuturesUnordered, - FutureExt, -}; -use iroh_bytes::{ - baomap::{MapEntry, Store as BaoStore}, - util::{progress::IgnoreProgressSender, Hash}, -}; -use iroh_gossip::net::util::Dialer; -#[cfg(feature = "metrics")] -use iroh_metrics::{inc, inc_by}; -use iroh_net::{key::PublicKey, MagicEndpoint}; -use tokio::sync::{oneshot, Mutex}; -use tokio_stream::StreamExt; -use tracing::{debug, error, warn}; - -#[cfg(feature = "metrics")] -use crate::metrics::Metrics; - -/// Future for the completion of a download request -pub type DownloadFuture = Shared>>; - -/// A download queue for iroh-bytes -/// -/// Spawns a background task that handles connecting to peers and performing get requests. -/// -/// TODO: Support retries and backoff - become a proper queue... -/// TODO: Support collections, likely become generic over C: CollectionParser -#[derive(Debug, Clone)] -pub struct Downloader { - pending_downloads: Arc>>, - to_actor_tx: flume::Sender, -} - -impl Downloader { - /// Create a new downloader - pub fn new( - rt: iroh_bytes::util::runtime::Handle, - endpoint: MagicEndpoint, - db: B, - ) -> Self { - let (tx, rx) = flume::bounded(64); - // spawn the actor on a local pool - // the local pool is required because WritableFileDatabase::download_single - // returns a future that is !Send - rt.local_pool().spawn_pinned(move || async move { - let mut actor = DownloadActor::new(endpoint, db, rx); - if let Err(err) = actor.run().await { - error!("download actor failed with error {err:?}"); - } - }); - Self { - pending_downloads: Arc::new(Mutex::new(HashMap::new())), - to_actor_tx: tx, - } - } - - /// Add a new download request to the download queue. - /// - /// Note: This method takes only [`PublicKey`]s and will attempt to connect to those peers. For - /// this to succeed, you need to add addresses for these peers to the magic endpoint's - /// addressbook yourself. See [`MagicEndpoint::add_known_addrs`]. - pub async fn push(&self, hash: Hash, peers: Vec) { - let (reply, reply_rx) = oneshot::channel(); - let req = DownloadRequest { hash, peers, reply }; - - if let Err(err) = self.to_actor_tx.send_async(req).await { - warn!("download actor dropped: {err}"); - } - - if self.pending_downloads.lock().await.get(&hash).is_none() { - let pending_downloads = self.pending_downloads.clone(); - let fut = async move { - let res = reply_rx.await; - pending_downloads.lock().await.remove(&hash); - res.ok().flatten() - }; - self.pending_downloads - .lock() - .await - .insert(hash, fut.boxed().shared()); - } - } - - /// Returns a future that completes once the blob for `hash` has been downloaded, or all queued - /// requests for that blob have failed. - /// - /// NOTE: This does not start the download itself. Use [`Self::push`] for that. - pub async fn finished(&self, hash: &Hash) -> DownloadFuture { - match self.pending_downloads.lock().await.get(hash) { - Some(fut) => fut.clone(), - None => futures::future::ready(None).boxed().shared(), - } - } -} - -type DownloadReply = oneshot::Sender>; -type PendingDownloadsFutures = - FuturesUnordered>)>>; - -#[derive(Debug)] -struct DownloadRequest { - hash: Hash, - peers: Vec, - reply: DownloadReply, -} - -#[derive(Debug)] -struct DownloadActor { - dialer: Dialer, - db: B, - conns: HashMap, - replies: HashMap>, - pending_download_futs: PendingDownloadsFutures, - queue: DownloadQueue, - rx: flume::Receiver, -} -impl DownloadActor { - fn new(endpoint: MagicEndpoint, db: B, rx: flume::Receiver) -> Self { - Self { - rx, - db, - dialer: Dialer::new(endpoint), - replies: Default::default(), - conns: Default::default(), - pending_download_futs: Default::default(), - queue: Default::default(), - } - } - pub async fn run(&mut self) -> anyhow::Result<()> { - loop { - tokio::select! { - req = self.rx.recv_async() => match req { - Err(_) => return Ok(()), - Ok(req) => self.on_download_request(req).await - }, - (peer, conn) = self.dialer.next() => match conn { - Ok(conn) => { - debug!(peer = ?peer, "connection established"); - self.conns.insert(peer, conn); - self.on_peer_ready(peer); - }, - Err(err) => self.on_peer_fail(&peer, err), - }, - Some((peer, hash, res)) = self.pending_download_futs.next() => match res { - Ok(Some(size)) => { - self.queue.on_success(hash, peer); - self.reply(hash, Some((hash, size))); - self.on_peer_ready(peer); - } - Ok(None) => { - // TODO: This case is currently never reached, because iroh::get::get_blob - // doesn't return an option but only a result, with no way (AFAICS) to discern - // between connection error and not found. - // self.on_not_found(&peer, hash); - // self.on_peer_ready(peer); - unreachable!() - } - Err(_err) => { - self.on_not_found(&peer, hash); - self.on_peer_ready(peer); - // TODO: In case of connection errors or similar we want to call - // on_peer_fail to not continue downloading from this peer. - // Currently however a "not found" is also an error, thus calling - // on_peer_fail would stop trying to get other hashes from this peer. - // This likely needs fixing in iroh::get::get to have a meaningful error to - // see if the connection failed or if it's just a "not found". - // self.on_peer_fail(&peer, err), - } - } - } - } - } - - fn reply(&mut self, hash: Hash, res: Option<(Hash, u64)>) { - for reply in self.replies.remove(&hash).into_iter().flatten() { - reply.send(res).ok(); - } - } - - fn on_peer_fail(&mut self, peer: &PublicKey, err: anyhow::Error) { - warn!("download from {peer} failed: {err:?}"); - for hash in self.queue.on_peer_fail(peer) { - self.reply(hash, None); - } - self.conns.remove(peer); - } - - fn on_not_found(&mut self, peer: &PublicKey, hash: Hash) { - self.queue.on_not_found(hash, *peer); - if self.queue.has_no_candidates(&hash) { - self.reply(hash, None); - } - } - - fn on_peer_ready(&mut self, peer: PublicKey) { - if let Some(hash) = self.queue.try_next_for_peer(peer) { - debug!(peer = ?peer, hash = ?hash, "on_peer_ready: get next"); - self.start_download_unchecked(peer, hash); - } else { - debug!(peer = ?peer, "on_peer_ready: nothing left, disconnect"); - self.conns.remove(&peer); - } - } - - fn start_download_unchecked(&mut self, peer: PublicKey, hash: Hash) { - let conn = self.conns.get(&peer).unwrap().clone(); - let db = self.db.clone(); - let progress_sender = IgnoreProgressSender::default(); - - let fut = async move { - debug!(peer = ?peer, hash = ?hash, "start download"); - - #[cfg(feature = "metrics")] - let start = Instant::now(); - - // TODO: None for not found instead of error - let res = crate::get::get_blob(&db, conn, &hash, progress_sender).await; - let res = res.and_then(|_stats| { - db.get(&hash) - .ok_or_else(|| anyhow!("downloaded blob not found in store")) - .map(|entry| Some(entry.size())) - }); - debug!(peer = ?peer, hash = ?hash, "finish download: {res:?}"); - - // record metrics - #[cfg(feature = "metrics")] - { - let elapsed = start.elapsed().as_millis(); - match &res { - Ok(Some(len)) => { - inc!(Metrics, downloads_success); - inc_by!(Metrics, download_bytes_total, *len); - inc_by!(Metrics, download_time_total, elapsed as u64); - } - Ok(None) => inc!(Metrics, downloads_notfound), - Err(_) => inc!(Metrics, downloads_error), - } - } - - (peer, hash, res) - }; - self.pending_download_futs.push(fut.boxed_local()); - } - - async fn on_download_request(&mut self, req: DownloadRequest) { - let DownloadRequest { peers, hash, reply } = req; - if let Some(entry) = self.db.get(&hash) { - let size = entry.size(); - reply.send(Some((hash, size))).ok(); - return; - } - self.replies.entry(hash).or_default().push_back(reply); - for peer in peers { - debug!(peer = ?peer, hash = ?hash, "queue download"); - self.queue.push_candidate(hash, peer); - // TODO: Don't dial all peers instantly. - if self.conns.get(&peer).is_none() && !self.dialer.is_pending(&peer) { - self.dialer.queue_dial(peer, &iroh_bytes::protocol::ALPN); - } - } - } -} - -#[derive(Debug, Default)] -struct DownloadQueue { - candidates_by_hash: HashMap>, - candidates_by_peer: HashMap>, - running_by_hash: HashMap, - running_by_peer: HashMap, -} - -impl DownloadQueue { - pub fn push_candidate(&mut self, hash: Hash, peer: PublicKey) { - self.candidates_by_hash - .entry(hash) - .or_default() - .push_back(peer); - self.candidates_by_peer - .entry(peer) - .or_default() - .push_back(hash); - } - - pub fn try_next_for_peer(&mut self, peer: PublicKey) -> Option { - let mut next = None; - for (idx, hash) in self.candidates_by_peer.get(&peer)?.iter().enumerate() { - if !self.running_by_hash.contains_key(hash) { - next = Some((idx, *hash)); - break; - } - } - if let Some((idx, hash)) = next { - self.running_by_hash.insert(hash, peer); - self.running_by_peer.insert(peer, hash); - self.candidates_by_peer.get_mut(&peer).unwrap().remove(idx); - if let Some(peers) = self.candidates_by_hash.get_mut(&hash) { - peers.retain(|p| p != &peer); - } - self.ensure_no_empty(hash, peer); - Some(hash) - } else { - None - } - } - - pub fn has_no_candidates(&self, hash: &Hash) -> bool { - self.candidates_by_hash.get(hash).is_none() && self.running_by_hash.get(hash).is_none() - } - - /// Mark a download as successfull. - pub fn on_success(&mut self, hash: Hash, peer: PublicKey) { - let peer2 = self.running_by_hash.remove(&hash); - debug_assert_eq!(peer2, Some(peer)); - self.running_by_peer.remove(&peer); - self.candidates_by_hash.remove(&hash); - for hashes in self.candidates_by_peer.values_mut() { - hashes.retain(|h| h != &hash); - } - self.ensure_no_empty(hash, peer); - } - - /// To be called when a peer failed (i.e. disconnected). - /// - /// Returns a list of hashes that have no other peers queue. Those hashes should thus be - /// considered failed. - pub fn on_peer_fail(&mut self, peer: &PublicKey) -> Vec { - let mut failed = vec![]; - for hash in self - .candidates_by_peer - .remove(peer) - .map(|hashes| hashes.into_iter()) - .into_iter() - .flatten() - { - if let Some(peers) = self.candidates_by_hash.get_mut(&hash) { - peers.retain(|p| p != peer); - if peers.is_empty() && self.running_by_hash.get(&hash).is_none() { - failed.push(hash); - } - } - } - if let Some(hash) = self.running_by_peer.remove(peer) { - self.running_by_hash.remove(&hash); - if self.candidates_by_hash.get(&hash).is_none() { - failed.push(hash); - } - } - failed - } - - pub fn on_not_found(&mut self, hash: Hash, peer: PublicKey) { - let peer2 = self.running_by_hash.remove(&hash); - debug_assert_eq!(peer2, Some(peer)); - self.running_by_peer.remove(&peer); - self.ensure_no_empty(hash, peer); - } - - fn ensure_no_empty(&mut self, hash: Hash, peer: PublicKey) { - if self - .candidates_by_peer - .get(&peer) - .map_or(false, |hashes| hashes.is_empty()) - { - self.candidates_by_peer.remove(&peer); - } - if self - .candidates_by_hash - .get(&hash) - .map_or(false, |peers| peers.is_empty()) - { - self.candidates_by_hash.remove(&hash); - } - } -} diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs new file mode 100644 index 0000000000..df139a2266 --- /dev/null +++ b/iroh/src/downloader.rs @@ -0,0 +1,989 @@ +//! Handle downloading blobs and collections concurrently and from peers. +//! +//! The [`Downloader`] interacts with four main components to this end. +//! - [`Dialer`]: Used to queue opening connections to peers we need to perform downloads. +//! - [`ProviderMap`]: Where the downloader obtains information about peers that could be +//! used to perform a download. +//! - [`Store`]: Where data is stored. +//! - [`CollectionParser`]: Used by the Get state machine logic to identify blobs encoding +//! collections. +//! +//! Once a download request is received, the logic is as follows: +//! 1. The [`ProviderMap`] is queried for peers. From these peers some are selected +//! prioritizing connected peers with lower number of active requests. If no useful peer is +//! connected, or useful connected peers have no capacity to perform the request, a connection +//! attempt is started using the [`Dialer`]. +//! 2. The download is queued for processing at a later time. Downloads are not performed right +//! away. Instead, they are initially delayed to allow the peer to obtain the data itself, and +//! to wait for the new connection to be established if necessary. +//! 3. Once a request is ready to be sent after a delay (initial or for a retry), the preferred +//! peer is used if available. The request is now considered active. +//! +//! Concurrency is limited in different ways: +//! - *Total number of active request:* This is a way to prevent a self DoS by overwhelming our own +//! bandwidth capacity. This is a best effort heuristic since it doesn't take into account how +//! much data we are actually requesting or receiving. +//! - *Total number of connected peers:* Peer connections are kept for a longer time than they are +//! strictly needed since it's likely they will be useful soon again. +//! - *Requests per peer*: to avoid overwhelming peers with requests, the number of concurrent +//! requests to a single peer is also limited. + +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + num::NonZeroUsize, +}; + +use futures::{future::LocalBoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use iroh_bytes::{ + baomap::{range_collections::RangeSet2, Store}, + collection::CollectionParser, + protocol::RangeSpecSeq, + Hash, +}; +use iroh_net::{key::PublicKey, MagicEndpoint}; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::{sync::CancellationToken, time::delay_queue}; +use tracing::{debug, trace}; + +mod get; +mod invariants; +mod test; + +/// Delay added to a request when it's first received. +const INITIAL_REQUEST_DELAY: std::time::Duration = std::time::Duration::from_millis(500); +/// Number of retries initially assigned to a request. +const INITIAL_RETRY_COUNT: u8 = 4; +/// Duration for which we keep peers connected after they were last useful to us. +const IDLE_PEER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +/// Capacity of the channel used to comunicate between the [`Downloader`] and the [`Service`]. +const SERVICE_CHANNEL_CAPACITY: usize = 128; + +/// Download identifier. +// Mainly for readability. +pub type Id = u64; + +/// Trait modeling a dialer. This allows for IO-less testing. +pub trait Dialer: + futures::Stream)> + Unpin +{ + /// Type of connections returned by the Dialer. + type Connection: Clone; + /// Dial a peer. + fn queue_dial(&mut self, peer_id: PublicKey); + /// Get the number of dialing peers. + fn pending_count(&self) -> usize; + /// Check if a peer is being dialed. + fn is_pending(&self, peer: &PublicKey) -> bool; +} + +/// Signals what should be done with the request when it fails. +#[derive(Debug)] +pub enum FailureAction { + /// An error ocurred that prevents the request from being retried at all. + AbortRequest(anyhow::Error), + /// An error occurred that suggests the peer should not be used in general. + DropPeer(anyhow::Error), + /// An error occurred in which neither the peer nor the request are at fault. + RetryLater(anyhow::Error), +} + +/// Future of a get request. +type GetFut = LocalBoxFuture<'static, Result<(), FailureAction>>; + +/// Trait modelling performing a single request over a connection. This allows for IO-less testing. +pub trait Getter { + /// Type of connections the Getter requires to perform a download. + type Connection; + /// Return a future that performs the download using the given connection. + fn get(&mut self, kind: DownloadKind, conn: Self::Connection) -> GetFut; +} + +/// Concurrency limits for the [`Downloader`]. +#[derive(Debug)] +pub struct ConcurrencyLimits { + /// Maximum number of requests the service performs concurrently. + pub max_concurrent_requests: usize, + /// Maximum number of requests performed by a single peer concurrently. + pub max_concurrent_requests_per_peer: usize, + /// Maximum number of open connections the service maintains. + pub max_open_connections: usize, +} + +impl Default for ConcurrencyLimits { + fn default() -> Self { + // these numbers should be checked against a running node and might depend on platform + ConcurrencyLimits { + max_concurrent_requests: 50, + max_concurrent_requests_per_peer: 4, + max_open_connections: 25, + } + } +} + +impl ConcurrencyLimits { + /// Checks if the maximum number of concurrent requests has been reached. + fn at_requests_capacity(&self, active_requests: usize) -> bool { + active_requests >= self.max_concurrent_requests + } + + /// Checks if the maximum number of concurrent requests per peer has been reached. + fn peer_at_request_capacity(&self, active_peer_requests: usize) -> bool { + active_peer_requests >= self.max_concurrent_requests_per_peer + } + + /// Checks if the maximum number of connections has been reached. + fn at_connections_capacity(&self, active_connections: usize) -> bool { + active_connections >= self.max_open_connections + } +} + +/// Download requests the [`Downloader`] handles. +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub enum DownloadKind { + /// Download a single blob entirely. + Blob { + /// Blob to be downloaded. + hash: Hash, + }, + /// Download a collection entirely. + Collection { + /// Blob to be downloaded. + hash: Hash, + }, +} + +impl DownloadKind { + /// Get the requested hash. + const fn hash(&self) -> &Hash { + match self { + DownloadKind::Blob { hash } | DownloadKind::Collection { hash } => hash, + } + } + + /// Get the ranges this download is requesting. + // NOTE: necessary to extend downloads to support ranges of blobs ranges of collections. + #[allow(dead_code)] + fn ranges(&self) -> RangeSpecSeq { + match self { + DownloadKind::Blob { .. } => RangeSpecSeq::from_ranges([RangeSet2::all()]), + DownloadKind::Collection { .. } => RangeSpecSeq::all(), + } + } +} + +// For readability. In the future we might care about some data reporting on a successful download +// or kind of failure in the error case. +type DownloadResult = anyhow::Result<()>; + +/// Handle to interact with a download request. +#[derive(Debug)] +pub struct DownloadHandle { + /// Id used to identify the request in the [`Downloader`]. + id: Id, + /// Kind of download. + kind: DownloadKind, + /// Receiver to retrieve the return value of this download. + receiver: oneshot::Receiver, +} + +impl std::future::Future for DownloadHandle { + type Output = DownloadResult; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + use std::task::Poll::*; + // make it easier on holders of the handle to poll the result, removing the receiver error + // from the middle + match self.receiver.poll_unpin(cx) { + Ready(Ok(result)) => Ready(result), + Ready(Err(recv_err)) => Ready(Err(anyhow::anyhow!("oneshot error: {recv_err}"))), + Pending => Pending, + } + } +} + +/// Handle for the download services. +#[derive(Debug)] +pub struct Downloader { + /// Next id to use for a download intent. + next_id: Id, + /// Channel to communicate with the service. + msg_tx: mpsc::Sender, +} + +impl Downloader { + /// Create a new Downloader. + pub async fn new( + store: S, + collection_parser: C, + endpoint: MagicEndpoint, + rt: iroh_bytes::util::runtime::Handle, + ) -> Self + where + S: Store, + C: CollectionParser, + { + let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); + let dialer = iroh_gossip::net::util::Dialer::new(endpoint); + + let create_future = move || { + let concurrency_limits = ConcurrencyLimits::default(); + let getter = get::IoGetter { + store, + collection_parser, + }; + + let service = Service::new(getter, dialer, concurrency_limits, msg_rx); + + service.run() + }; + rt.local_pool().spawn_pinned(create_future); + Self { next_id: 0, msg_tx } + } + + /// Queue a download. + pub async fn queue(&mut self, kind: DownloadKind, peers: Vec) -> DownloadHandle { + let id = self.next_id; + self.next_id = self.next_id.wrapping_add(1); + + let (sender, receiver) = oneshot::channel(); + let handle = DownloadHandle { + id, + kind: kind.clone(), + receiver, + }; + let msg = Message::Queue { + kind, + id, + sender, + peers, + }; + // if this fails polling the handle will fail as well since the sender side of the oneshot + // will be dropped + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "download not sent"); + } + handle + } + + /// Cancel a download. + // NOTE: receiving the handle ensures an intent can't be cancelled twice + pub async fn cancel(&mut self, handle: DownloadHandle) { + let DownloadHandle { + id, + kind, + receiver: _, + } = handle; + let msg = Message::Cancel { id, kind }; + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "cancel not sent"); + } + } + + /// Declare that certains peers can be used to download a hash. + pub async fn peers_have(&mut self, hash: Hash, peers: Vec) { + let msg = Message::PeersHave { hash, peers }; + if let Err(send_err) = self.msg_tx.send(msg).await { + let msg = send_err.0; + debug!(?msg, "peers have not sent") + } + } +} + +/// Messages the service can receive. +#[derive(derive_more::Debug)] +enum Message { + /// Queue a download intent. + Queue { + kind: DownloadKind, + id: Id, + #[debug(skip)] + sender: oneshot::Sender, + peers: Vec, + }, + /// Cancel an intent. The associated request will be cancelled when the last intent is + /// cancelled. + Cancel { id: Id, kind: DownloadKind }, + /// Declare that peers have certains hash and can be used for downloading. This feeds the [`ProviderMap`]. + PeersHave { hash: Hash, peers: Vec }, +} + +/// Information about a request being processed. +#[derive(derive_more::Debug)] +struct ActiveRequestInfo { + /// Ids of intents associated with this request. + #[debug("{:?}", intents.keys().collect::>())] + intents: HashMap>, + /// How many times can this request be retried. + remaining_retries: u8, + /// Token used to cancel the future doing the request. + #[debug(skip)] + cancellation: CancellationToken, + /// Peer doing this request attempt. + peer: PublicKey, +} + +/// Information about a request that has not started. +#[derive(derive_more::Debug)] +struct PendingRequestInfo { + /// Ids of intents associated with this request. + #[debug("{:?}", intents.keys().collect::>())] + intents: HashMap>, + /// How many times can this request be retried. + remaining_retries: u8, + /// Key to manage the delay associated with this scheduled request. + #[debug(skip)] + delay_key: delay_queue::Key, + /// If this attempt was scheduled with a known potential peer, this is stored here to + /// prevent another query to the [`ProviderMap`]. + next_peer: Option, +} + +/// State of the connection to this peer. +#[derive(derive_more::Debug)] +struct ConnectionInfo { + /// Connection to this peer. + /// + /// If this peer was deemed unusable by a request, this will be set to `None`. As a + /// consequence, when evaluating peers for a download, this peer will not be considered. + /// Since peers are kept for a longer time that they are strictly necessary, this acts as a + /// temporary ban. + #[debug(skip)] + conn: Option, + /// State of this peer. + state: PeerState, +} + +impl ConnectionInfo { + /// Create a new idle peer. + fn new_idle(connection: Conn, drop_key: delay_queue::Key) -> Self { + ConnectionInfo { + conn: Some(connection), + state: PeerState::Idle { drop_key }, + } + } + + /// Count of active requests for the peer. + fn active_requests(&self) -> usize { + match self.state { + PeerState::Busy { active_requests } => active_requests.get(), + PeerState::Idle { .. } => 0, + } + } +} + +/// State of a connected peer. +#[derive(derive_more::Debug)] +enum PeerState { + /// Peer is handling at least one request. + Busy { + #[debug("{}", active_requests.get())] + active_requests: NonZeroUsize, + }, + /// Peer is idle. + Idle { + #[debug(skip)] + drop_key: delay_queue::Key, + }, +} + +/// Type of future that performs a download request. +type DownloadFut = LocalBoxFuture<'static, (DownloadKind, Result<(), FailureAction>)>; + +#[derive(Debug)] +struct Service { + /// The getter performs individual requests. + getter: G, + /// Map to query for peers that we believe have the data we are looking for. + providers: ProviderMap, + /// Dialer to get connections for required peers. + dialer: D, + /// Limits to concurrent tasks handled by the service. + concurrency_limits: ConcurrencyLimits, + /// Channel to receive messages from the service's handle. + msg_rx: mpsc::Receiver, + /// Peers available to use and their relevant information. + peers: HashMap>, + /// Queue to manage dropping peers. + goodbye_peer_queue: delay_queue::DelayQueue, + /// Requests performed for download intents. Two download requests can produce the same + /// request. This map allows deduplication of efforts. + current_requests: HashMap, + /// Downloads underway. + in_progress_downloads: FuturesUnordered, + /// Requests scheduled to be downloaded at a later time. + scheduled_requests: HashMap, + /// Queue of scheduled requests. + scheduled_request_queue: delay_queue::DelayQueue, +} + +impl, D: Dialer> Service { + fn new( + getter: G, + dialer: D, + concurrency_limits: ConcurrencyLimits, + msg_rx: mpsc::Receiver, + ) -> Self { + Service { + getter, + providers: ProviderMap::default(), + dialer, + concurrency_limits, + msg_rx, + peers: HashMap::default(), + goodbye_peer_queue: delay_queue::DelayQueue::default(), + current_requests: HashMap::default(), + in_progress_downloads: FuturesUnordered::default(), + scheduled_requests: HashMap::default(), + scheduled_request_queue: delay_queue::DelayQueue::default(), + } + } + + /// Main loop for the service. + async fn run(mut self) { + loop { + // check if we have capacity to dequeue another scheduled request + let at_capacity = self + .concurrency_limits + .at_requests_capacity(self.in_progress_downloads.len()); + + tokio::select! { + Some((peer, conn_result)) = self.dialer.next() => { + trace!("tick: connection ready"); + self.on_connection_ready(peer, conn_result); + } + maybe_msg = self.msg_rx.recv() => { + trace!(msg=?maybe_msg, "tick: message received"); + match maybe_msg { + Some(msg) => self.handle_message(msg), + None => return self.shutdown().await, + } + } + Some((kind, result)) = self.in_progress_downloads.next() => { + trace!("tick: download completed"); + self.on_download_completed(kind, result); + } + Some(expired) = self.scheduled_request_queue.next(), if !at_capacity => { + trace!("tick: scheduled request ready"); + let kind = expired.into_inner(); + let request_info = self.scheduled_requests.remove(&kind).expect("is registered"); + self.on_scheduled_request_ready(kind, request_info); + } + Some(expired) = self.goodbye_peer_queue.next() => { + let peer = expired.into_inner(); + self.peers.remove(&peer); + trace!(%peer, "tick: goodbye peer"); + } + } + #[cfg(any(test, debug_assertions))] + self.check_invariants(); + } + } + + /// Handle receiving a [`Message`]. + fn handle_message(&mut self, msg: Message) { + match msg { + Message::Queue { + kind, + id, + sender, + peers, + } => self.handle_queue_new_download(kind, id, sender, peers), + Message::Cancel { id, kind } => self.handle_cancel_download(id, kind), + Message::PeersHave { hash, peers } => self.handle_peers_have(hash, peers), + } + } + + /// Handle a [`Message::Queue`]. + /// + /// If this intent maps to a request that already exists, it will be registered with it. If the + /// request is new it will be scheduled. + fn handle_queue_new_download( + &mut self, + kind: DownloadKind, + id: Id, + sender: oneshot::Sender, + peers: Vec, + ) { + self.providers.add_peers(*kind.hash(), &peers); + if let Some(info) = self.current_requests.get_mut(&kind) { + // this intent maps to a download that already exists, simply register it + info.intents.insert(id, sender); + // increasing the retries by one accounts for multiple intents for the same request in + // a conservative way + info.remaining_retries += 1; + return trace!(?kind, ?info, "intent registered with active request"); + } + + let needs_peer = self + .scheduled_requests + .get(&kind) + .map(|info| info.next_peer.is_none()) + .unwrap_or(true); + + let next_peer = needs_peer + .then(|| self.get_best_candidate(kind.hash())) + .flatten(); + + // if we are here this request is not active, check if it needs to be scheduled + match self.scheduled_requests.get_mut(&kind) { + Some(info) => { + info.intents.insert(id, sender); + // pre-emptively get a peer if we don't already have one + if info.next_peer.is_none() { + info.next_peer = next_peer + } + // increasing the retries by one accounts for multiple intents for the same request in + // a conservative way + info.remaining_retries += 1; + trace!(?kind, ?info, "intent registered with scheduled request"); + } + None => { + let intents = HashMap::from([(id, sender)]); + self.schedule_request(kind, INITIAL_RETRY_COUNT, next_peer, intents) + } + } + } + + /// Gets the best candidate for a download. + /// + /// Peers are selected prioritizing those with an open connection and with capacity for another + /// request, followed by peers we are currently dialing with capacity for another request. + /// Lastly, peers not connected and not dialing are considered. + /// + /// If the selected candidate is not connected and we have capacity for another connection, a + /// dial is queued. + fn get_best_candidate(&mut self, hash: &Hash) -> Option { + /// Model the state of peers found in the candidates + #[derive(PartialEq, Eq, Clone, Copy)] + enum ConnState { + Dialing, + Connected(usize), + NotConnected, + } + + impl Ord for ConnState { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // define the order of preference between candidates as follows: + // - prefer connected peers to dialing ones + // - prefer dialing peers to not connected ones + // - prefer peers with less active requests when connected + use std::cmp::Ordering::*; + match (self, other) { + (ConnState::Dialing, ConnState::Dialing) => Equal, + (ConnState::Dialing, ConnState::Connected(_)) => Less, + (ConnState::Dialing, ConnState::NotConnected) => Greater, + (ConnState::NotConnected, ConnState::Dialing) => Less, + (ConnState::NotConnected, ConnState::Connected(_)) => Less, + (ConnState::NotConnected, ConnState::NotConnected) => Equal, + (ConnState::Connected(_), ConnState::Dialing) => Greater, + (ConnState::Connected(_), ConnState::NotConnected) => Greater, + (ConnState::Connected(a), ConnState::Connected(b)) => match a.cmp(b) { + Less => Greater, // less preferable if greater number of requests + Equal => Equal, // no preference + Greater => Less, // more preferable if less number of requests + }, + } + } + } + + impl PartialOrd for ConnState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + // first collect suitable candidates + let mut candidates = self + .providers + .get_candidates(hash) + .filter_map(|peer| { + if let Some(info) = self.peers.get(peer) { + info.conn.as_ref()?; + let req_count = info.active_requests(); + // filter out peers at capacity + let has_capacity = !self.concurrency_limits.peer_at_request_capacity(req_count); + has_capacity.then_some((peer, ConnState::Connected(req_count))) + } else if self.dialer.is_pending(peer) { + Some((peer, ConnState::Dialing)) + } else { + Some((peer, ConnState::NotConnected)) + } + }) + .collect::>(); + + candidates.sort_unstable_by_key(|peer_and_state| peer_and_state.1 /* state */); + + // this is our best peer, check if we need to dial it + let (peer, state) = candidates.pop()?; + + if let ConnState::NotConnected = state { + if !self.at_connections_capacity() { + // peer is not connected, not dialing and concurrency limits allow another connection + debug!(%peer, "dialing peer"); + self.dialer.queue_dial(*peer); + Some(*peer) + } else { + trace!(%peer, "required peer not dialed to maintain concurrency limits"); + None + } + } else { + Some(*peer) + } + } + + /// Cancels the download request. + /// + /// This removes the registered download intent and, depending on its state, it will either + /// remove it from the scheduled requests, or cancel the future. + fn handle_cancel_download(&mut self, id: Id, kind: DownloadKind) { + if let Entry::Occupied(mut occupied_entry) = self.current_requests.entry(kind.clone()) { + // remove the intent from the associated request + let intents = &mut occupied_entry.get_mut().intents; + intents.remove(&id); + // if this was the last intent associated with the request cancel it + if intents.is_empty() { + occupied_entry.remove().cancellation.cancel(); + } + } else if let Entry::Occupied(mut occupied_entry) = self.scheduled_requests.entry(kind) { + // remove the intent from the associated request + let intents = &mut occupied_entry.get_mut().intents; + intents.remove(&id); + // if this was the last intent associated with the request remove it from the schedule + // queue + if intents.is_empty() { + let delay_key = occupied_entry.remove().delay_key; + self.scheduled_request_queue.remove(&delay_key); + } + } + } + + /// Handle a [`Message::PeersHave`]. + fn handle_peers_have(&mut self, hash: Hash, peers: Vec) { + // check if this still needed + if self.is_needed(hash) { + self.providers.add_peers(hash, &peers); + } + } + + /// Checks if this hash is needed. + fn is_needed(&self, hash: Hash) -> bool { + let as_blob = DownloadKind::Blob { hash }; + let as_collection = DownloadKind::Collection { hash }; + self.current_requests.contains_key(&as_blob) + || self.scheduled_requests.contains_key(&as_blob) + || self.current_requests.contains_key(&as_collection) + || self.scheduled_requests.contains_key(&as_collection) + } + + /// Handle receiving a new connection. + fn on_connection_ready(&mut self, peer: PublicKey, result: anyhow::Result) { + match result { + Ok(connection) => { + trace!(%peer, "connected to peer"); + let drop_key = self.goodbye_peer_queue.insert(peer, IDLE_PEER_TIMEOUT); + self.peers + .insert(peer, ConnectionInfo::new_idle(connection, drop_key)); + } + Err(err) => { + debug!(%peer, %err, "connection to peer failed") + } + } + } + + fn on_download_completed(&mut self, kind: DownloadKind, result: Result<(), FailureAction>) { + // first remove the request + let info = self + .current_requests + .remove(&kind) + .expect("request was active"); + + // update the active requests for this peer + let ActiveRequestInfo { + intents, + peer, + mut remaining_retries, + .. + } = info; + + let peer_info = self + .peers + .get_mut(&peer) + .expect("peer exists in the mapping"); + peer_info.state = match &peer_info.state { + PeerState::Busy { active_requests } => { + match NonZeroUsize::new(active_requests.get() - 1) { + Some(active_requests) => PeerState::Busy { active_requests }, + None => { + // last request of the peer was this one + let drop_key = self.goodbye_peer_queue.insert(peer, IDLE_PEER_TIMEOUT); + PeerState::Idle { drop_key } + } + } + } + PeerState::Idle { .. } => unreachable!("peer was busy"), + }; + + let hash = *kind.hash(); + + match result { + Ok(()) => { + debug!(%peer, ?kind, "download completed"); + for sender in intents.into_values() { + let _ = sender.send(Ok(())); + } + } + Err(FailureAction::AbortRequest(reason)) => { + debug!(%peer, ?kind, %reason, "aborting request"); + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("request aborted"))); + } + } + Err(FailureAction::DropPeer(reason)) => { + debug!(%peer, ?kind, %reason, "peer will be dropped"); + if let Some(_connection) = peer_info.conn.take() { + // TODO(@divma): this will fail open streams, do we want this? + // connection.close(..) + } + } + Err(FailureAction::RetryLater(reason)) => { + // check if the download can be retried + if remaining_retries > 0 { + debug!(%peer, ?kind, %reason, "download attempt failed"); + remaining_retries -= 1; + let next_peer = self.get_best_candidate(kind.hash()); + self.schedule_request(kind, remaining_retries, next_peer, intents); + } else { + debug!(%peer, ?kind, %reason, "download failed"); + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("download ran out of attempts"))); + } + } + } + } + + if !self.is_needed(hash) { + self.providers.remove(hash) + } + } + + /// A scheduled request is ready to be processed. + /// + /// The peer that was initially selected is used if possible. Otherwise we try to get a new + /// peer + fn on_scheduled_request_ready(&mut self, kind: DownloadKind, info: PendingRequestInfo) { + let PendingRequestInfo { + intents, + mut remaining_retries, + next_peer, + .. + } = info; + + // first try with the peer that was initially assigned + if let Some((peer, conn)) = next_peer.and_then(|peer| { + self.get_peer_connection_for_download(&peer) + .map(|conn| (peer, conn)) + }) { + return self.start_download(kind, peer, conn, remaining_retries, intents); + } + + // we either didn't have a peer or the peer is busy or dialing. In any case try to get + // another peer + let next_peer = match self.get_best_candidate(kind.hash()) { + None => None, + Some(peer) => { + // optimistically check if the peer could do the request right away + match self.get_peer_connection_for_download(&peer) { + Some(conn) => { + return self.start_download(kind, peer, conn, remaining_retries, intents) + } + None => Some(peer), + } + } + }; + + // we tried to get a peer to perform this request but didn't get one, so now this attempt + // is failed + if remaining_retries > 0 { + remaining_retries -= 1; + self.schedule_request(kind, remaining_retries, next_peer, intents); + } else { + // request can't be retried + for sender in intents.into_values() { + let _ = sender.send(Err(anyhow::anyhow!("download ran out of attempts"))); + } + debug!(?kind, "download ran out of attempts") + } + } + + /// Start downloading from the given peer. + fn start_download( + &mut self, + kind: DownloadKind, + peer: PublicKey, + conn: D::Connection, + remaining_retries: u8, + intents: HashMap>, + ) { + debug!(%peer, ?kind, "starting download"); + let cancellation = CancellationToken::new(); + let info = ActiveRequestInfo { + intents, + remaining_retries, + cancellation, + peer, + }; + let cancellation = info.cancellation.clone(); + self.current_requests.insert(kind.clone(), info); + + let get = self.getter.get(kind.clone(), conn); + let fut = async move { + // NOTE: it's an open question if we should do timeouts at this point. Considerations from @Frando: + // > at this stage we do not know the size of the download, so the timeout would have + // > to be so large that it won't be useful for non-huge downloads. At the same time, + // > this means that a super slow peer would block a download from succeeding for a long + // > time, while faster peers could be readily available. + // As a conclusion, timeouts should be added only after downloads are known to be bounded + let res = tokio::select! { + _ = cancellation.cancelled() => Err(FailureAction::AbortRequest(anyhow::anyhow!("cancelled"))), + res = get => res + }; + + (kind, res) + }; + + self.in_progress_downloads.push(fut.boxed_local()); + } + + /// Schedule a request for later processing. + fn schedule_request( + &mut self, + kind: DownloadKind, + remaining_retries: u8, + next_peer: Option, + intents: HashMap>, + ) { + // this is simply INITIAL_REQUEST_DELAY * attempt_num where attempt_num (as an ordinal + // number) is maxed at INITIAL_RETRY_COUNT + let delay = INITIAL_REQUEST_DELAY + * (INITIAL_RETRY_COUNT.saturating_sub(remaining_retries) as u32 + 1); + let delay_key = self.scheduled_request_queue.insert(kind.clone(), delay); + + let info = PendingRequestInfo { + intents, + remaining_retries, + delay_key, + next_peer, + }; + debug!(?kind, ?info, "request scheduled"); + self.scheduled_requests.insert(kind, info); + } + + /// Gets the [`Dialer::Connection`] for a peer if it's connected and has capacity for another + /// request. In this case, the count of active requests for the peer is incremented. + fn get_peer_connection_for_download(&mut self, peer: &PublicKey) -> Option { + let info = self.peers.get_mut(peer)?; + let connection = info.conn.as_ref()?; + // check if the peer can be sent another request + match &mut info.state { + PeerState::Busy { active_requests } => { + if !self + .concurrency_limits + .peer_at_request_capacity(active_requests.get()) + { + *active_requests = active_requests.saturating_add(1); + Some(connection.clone()) + } else { + None + } + } + PeerState::Idle { drop_key } => { + // peer is no longer idle + self.goodbye_peer_queue.remove(drop_key); + info.state = PeerState::Busy { + active_requests: NonZeroUsize::new(1).expect("clearly non zero"), + }; + Some(connection.clone()) + } + } + } + + /// Check if we have maxed our connection capacity. + fn at_connections_capacity(&self) -> bool { + self.concurrency_limits + .at_connections_capacity(self.connections_count()) + } + + /// Get the total number of connected and dialing peers. + fn connections_count(&self) -> usize { + let connected_peers = self + .peers + .values() + .filter(|info| info.conn.is_some()) + .count(); + let dialing_peers = self.dialer.pending_count(); + connected_peers + dialing_peers + } + + async fn shutdown(self) { + debug!("shutting down"); + // TODO(@divma): how to make sure the download futures end gracefully? + } +} + +/// Map of potential providers for a hash. +#[derive(Default, Debug)] +pub struct ProviderMap { + /// Candidates to download a hash. + candidates: HashMap>, +} + +struct ProviderIter<'a> { + inner: Option>, +} + +impl<'a> Iterator for ProviderIter<'a> { + type Item = &'a PublicKey; + + fn next(&mut self) -> Option { + self.inner.as_mut().and_then(|iter| iter.next()) + } +} + +impl ProviderMap { + /// Get candidates to download this hash. + fn get_candidates(&self, hash: &Hash) -> impl Iterator { + let inner = self.candidates.get(hash).map(|peer_set| peer_set.iter()); + ProviderIter { inner } + } + + /// Register peers for a hash. Should only be done for hashes we care to download. + fn add_peers(&mut self, hash: Hash, peers: &[PublicKey]) { + self.candidates.entry(hash).or_default().extend(peers) + } + + /// Signal the registry that this hash is no longer of interest. + fn remove(&mut self, hash: Hash) { + self.candidates.remove(&hash); + } +} + +impl Dialer for iroh_gossip::net::util::Dialer { + type Connection = quinn::Connection; + + fn queue_dial(&mut self, peer_id: PublicKey) { + self.queue_dial(peer_id, &iroh_bytes::protocol::ALPN) + } + + fn pending_count(&self) -> usize { + self.pending_count() + } + + fn is_pending(&self, peer: &PublicKey) -> bool { + self.is_pending(peer) + } +} diff --git a/iroh/src/downloader/get.rs b/iroh/src/downloader/get.rs new file mode 100644 index 0000000000..858102ebed --- /dev/null +++ b/iroh/src/downloader/get.rs @@ -0,0 +1,556 @@ +//! [`Getter`] implementation that performs requests over [`quinn::Connection`]s. + +use anyhow::Context; +use bao_tree::io::fsm::OutboardMut; +use futures::FutureExt; +use iroh_bytes::baomap::range_collections::RangeSet2; +use iroh_bytes::{ + baomap::{MapEntry, PartialMapEntry, Store}, + collection::CollectionParser, + get::{ + self, + fsm::{AtBlobHeader, AtEndBlob, ConnectedNext, EndBlobNext}, + Stats, + }, + protocol::{GetRequest, RangeSpecSeq}, + util::Hash, + IROH_BLOCK_SIZE, +}; +#[cfg(feature = "metrics")] +use iroh_metrics::{inc, inc_by}; +use tracing::trace; + +use crate::get::{get_missing_ranges_blob, get_missing_ranges_collection, BlobInfo}; +#[cfg(feature = "metrics")] +use crate::metrics::Metrics; +use crate::util::progress::ProgressSliceWriter2; + +use super::{DownloadKind, FailureAction, GetFut, Getter}; + +/// [`Getter`] implementation that performs requests over [`quinn::Connection`]s. +pub(crate) struct IoGetter { + pub store: S, + pub collection_parser: C, +} + +impl Getter for IoGetter { + type Connection = quinn::Connection; + + fn get(&mut self, kind: DownloadKind, conn: Self::Connection) -> GetFut { + let store = self.store.clone(); + let collection_parser = self.collection_parser.clone(); + let fut = async move { + let get = match kind { + DownloadKind::Blob { hash } => get(&store, &collection_parser, conn, hash, false), + DownloadKind::Collection { hash } => { + get(&store, &collection_parser, conn, hash, true) + } + }; + + let res = get.await; + match res { + Ok(_stats) => { + #[cfg(feature = "metrics")] + { + let Stats { + bytes_written, + bytes_read: _, + elapsed, + } = _stats; + + inc!(Metrics, downloads_success); + inc_by!(Metrics, download_bytes_total, bytes_written); + inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64); + } + Ok(()) + } + Err(e) => { + // record metrics according to the error + #[cfg(feature = "metrics")] + { + match &e { + FailureAction::RetryLater(_) => inc!(Metrics, downloads_notfound), + _ => inc!(Metrics, downloads_error), + } + } + Err(e) + } + } + }; + fut.boxed_local() + } +} + +impl From for FailureAction { + fn from(value: quinn::ConnectionError) -> Self { + // explicit match just to be sure we are taking everything into account + match value { + e @ quinn::ConnectionError::VersionMismatch => { + // > The peer doesn't implement any supported version + // unsupported version is likely a long time error, so this peer is not usable + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::TransportError(_) => { + // > The peer violated the QUIC specification as understood by this implementation + // bad peer we don't want to keep around + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::ConnectionClosed(_) => { + // > The peer's QUIC stack aborted the connection automatically + // peer might be disconnecting or otherwise unavailable, drop it + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::ApplicationClosed(_) => { + // > The peer closed the connection + // peer might be disconnecting or otherwise unavailable, drop it + FailureAction::DropPeer(e.into()) + } + e @ quinn::ConnectionError::Reset => { + // > The peer is unable to continue processing this connection, usually due to having restarted + FailureAction::RetryLater(e.into()) + } + e @ quinn::ConnectionError::TimedOut => { + // > Communication with the peer has lapsed for longer than the negotiated idle timeout + FailureAction::RetryLater(e.into()) + } + e @ quinn::ConnectionError::LocallyClosed => { + // > The local application closed the connection + // TODO(@divma): don't see how this is reachable but let's just not use the peer + FailureAction::DropPeer(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: quinn::ReadError) -> Self { + match value { + e @ quinn::ReadError::Reset(_) => FailureAction::RetryLater(e.into()), + quinn::ReadError::ConnectionLost(conn_error) => conn_error.into(), + quinn::ReadError::UnknownStream + | quinn::ReadError::IllegalOrderedRead + | quinn::ReadError::ZeroRttRejected => { + // all these errors indicate the peer is not usable at this moment + FailureAction::DropPeer(value.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: quinn::WriteError) -> Self { + match value { + e @ quinn::WriteError::Stopped(_) => FailureAction::RetryLater(e.into()), + quinn::WriteError::ConnectionLost(conn_error) => conn_error.into(), + quinn::WriteError::UnknownStream | quinn::WriteError::ZeroRttRejected => { + // all these errors indicate the peer is not usable at this moment + FailureAction::DropPeer(value.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::ConnectedNextError) -> Self { + use iroh_bytes::get::fsm::ConnectedNextError::*; + match value { + e @ PostcardSer(_) => { + // serialization errors indicate something wrong with the request itself + FailureAction::AbortRequest(e.into()) + } + e @ RequestTooBig => { + // request will never be sent, drop it + FailureAction::AbortRequest(e.into()) + } + Write(e) => e.into(), + Read(e) => e.into(), + e @ CustomRequestTooBig => { + // something wrong with the request itself + FailureAction::AbortRequest(e.into()) + } + e @ Eof => { + // TODO(@divma): unsure about this based on docs + FailureAction::RetryLater(e.into()) + } + e @ PostcardDe(_) => { + // serialization errors can't be recovered + FailureAction::AbortRequest(e.into()) + } + e @ Io(_) => { + // io errors are likely recoverable + FailureAction::RetryLater(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::AtBlobHeaderNextError) -> Self { + use iroh_bytes::get::fsm::AtBlobHeaderNextError::*; + match value { + e @ NotFound => { + // > This indicates that the provider does not have the requested data. + // peer might have the data later, simply retry it + FailureAction::RetryLater(e.into()) + } + e @ InvalidQueryRange => { + // we are doing something wrong with this request, drop it + FailureAction::AbortRequest(e.into()) + } + Read(e) => e.into(), + e @ Io(_) => { + // io errors are likely recoverable + FailureAction::RetryLater(e.into()) + } + } + } +} + +impl From for FailureAction { + fn from(value: iroh_bytes::get::fsm::DecodeError) -> Self { + use get::fsm::DecodeError::*; + + match value { + e @ NotFound => FailureAction::RetryLater(e.into()), + e @ ParentNotFound(_) => FailureAction::RetryLater(e.into()), + e @ LeafNotFound(_) => FailureAction::RetryLater(e.into()), + e @ ParentHashMismatch(_) => { + // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong + // request? + FailureAction::AbortRequest(e.into()) + } + e @ LeafHashMismatch(_) => { + // TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong + // request? + FailureAction::AbortRequest(e.into()) + } + e @ InvalidQueryRange => FailureAction::AbortRequest(e.into()), + Read(e) => e.into(), + Io(e) => e.into(), + } + } +} + +impl From for FailureAction { + fn from(value: std::io::Error) -> Self { + // generally consider io errors recoverable + // we might want to revisit this at some point + FailureAction::RetryLater(value.into()) + } +} + +/// Get a blob or collection +pub async fn get( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + hash: Hash, + recursive: bool, +) -> Result { + let res = if recursive { + get_collection(db, collection_parser, conn, &hash).await + } else { + get_blob(db, conn, &hash).await + }; + if let Err(e) = res.as_ref() { + tracing::error!("get failed: {e:?}"); + } + res +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +pub async fn get_blob( + db: &D, + conn: quinn::Connection, + hash: &Hash, +) -> Result { + let end = if let Some(entry) = db.get_partial(hash) { + trace!("got partial data for {}", hash,); + + let required_ranges = get_missing_ranges_blob::(&entry) + .await + .ok() + .unwrap_or_else(RangeSet2::all); + let request = GetRequest::new(*hash, RangeSpecSeq::from_ranges([required_ranges])); + // full request + let request = get::fsm::start(conn, iroh_bytes::protocol::Request::Get(request)); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected `StartRoot` in single blob request" + ))); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + + get_blob_inner_partial(db, header, entry).await? + } else { + // full request + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::single(*hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected `StartRoot` in single blob request" + ))); + }; + // move to the header + let header = start.next(); + // do the ceremony of getting the blob and adding it to the database + get_blob_inner(db, header).await? + }; + + // we have requested a single hash, so we must be at closing + let EndBlobNext::Closing(end) = end.next() else { + // TODO(@divma): I think this is a codign error and not a peer error + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent extra data in single blob request" + ))); + }; + // this closes the bidi stream. Do something with the stats? + let stats = end.next().await?; + Ok(stats) +} + +/// Get a blob that was requested completely. +/// +/// We need to create our own files and handle the case where an outboard +/// is not needed. +async fn get_blob_inner( + db: &D, + header: AtBlobHeader, +) -> Result { + use iroh_io::AsyncSliceWriter; + + let hash = header.hash(); + // read the size + let (content, size) = header.next().await?; + // create the temp file pair + let entry = db.get_or_create_partial(hash, size)?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of: Option = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + let on_write = move |_offset: u64, _length: usize| Ok(()); + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + // TODO(@divma): what does this failure mean + // sync the data file + pw.sync().await?; + // sync the outboard file, if we wrote one + if let Some(mut of) = of { + of.sync().await?; + } + db.insert_complete(entry).await?; + Ok(end) +} + +fn needs_outboard(size: u64) -> bool { + size > (IROH_BLOCK_SIZE.bytes() as u64) +} + +/// Get a blob that was requested partially. +/// +/// We get passed the data and outboard ids. Partial downloads are only done +/// for large blobs where the outboard is present. +async fn get_blob_inner_partial( + db: &D, + header: AtBlobHeader, + entry: D::PartialEntry, +) -> Result { + // TODO: the data we get is validated at this point, but we need to check + // that it actually contains the requested ranges. Or DO WE? + use iroh_io::AsyncSliceWriter; + + // read the size + let (content, size) = header.next().await?; + // open the data file in any case + let df = entry.data_writer().await?; + let mut of = if needs_outboard(size) { + Some(entry.outboard_mut().await?) + } else { + None + }; + let on_write = move |_offset: u64, _length: usize| Ok(()); + let mut pw = ProgressSliceWriter2::new(df, on_write); + // use the convenience method to write all to the two vfs objects + let end = content + .write_all_with_outboard(of.as_mut(), &mut pw) + .await?; + + // TODO(@divma): what does this failure mean + // sync the data file + pw.sync().await?; + // sync the outboard file + if let Some(mut of) = of { + of.sync().await?; + } + // actually store the data. it is up to the db to decide if it wants to + // rename the files or not. + db.insert_complete(entry).await?; + Ok(end) +} + +/// Get a collection +pub async fn get_collection( + db: &D, + collection_parser: &C, + conn: quinn::Connection, + root_hash: &Hash, +) -> Result { + use tracing::info as log; + let finishing = if let Some(entry) = db.get(root_hash) { + log!("already got collection - doing partial download"); + // got the collection + let reader = entry.data_reader().await?; + let (mut collection, _) = collection_parser.parse(0, reader).await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that can't be parsed as collection : {e}" + )) + })?; + let mut children: Vec = vec![]; + while let Some(hash) = collection.next().await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "received collection data can't be iterated: {e}" + )) + })? { + children.push(hash); + } + let missing_info = get_missing_ranges_collection(db, &children).await?; + if missing_info.iter().all(|x| matches!(x, BlobInfo::Complete)) { + log!("nothing to do"); + return Ok(Stats::default()); + } + let missing_iter = std::iter::once(RangeSet2::empty()) + .chain(missing_info.iter().map(|x| x.missing_chunks())) + .collect::>(); + log!("requesting chunks {:?}", missing_iter); + let request = GetRequest::new(*root_hash, RangeSpecSeq::from_ranges(missing_iter)); + let request = get::fsm::start(conn, request.into()); + // create a new bidi stream + let connected = request.next().await?; + log!("connected"); + // we have not requested the root, so this must be StartChild + let ConnectedNext::StartChild(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that does not match requested info" + ))); + }; + let mut next = EndBlobNext::MoreChildren(start); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = usize::try_from(start.child_offset()) + .context("child offset too large") + .map_err(|_| { + FailureAction::AbortRequest(anyhow::anyhow!( + "requested offsets surpasses platform's usize" + )) + })?; + let (child_hash, info) = + match (children.get(child_offset), missing_info.get(child_offset)) { + (Some(blob), Some(info)) => (*blob, info), + _ => break start.finish(), + }; + tracing::info!( + "requesting child {} {:?}", + child_hash, + info.missing_chunks() + ); + let header = start.next(child_hash); + let end_blob = match info { + BlobInfo::Missing => get_blob_inner(db, header).await?, + BlobInfo::Partial { entry, .. } => { + get_blob_inner_partial(db, header, entry.clone()).await? + } + BlobInfo::Complete => { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data we did't request" + ))) + } + }; + next = end_blob.next(); + } + } else { + tracing::info!("don't have collection - doing full download"); + // don't have the collection, so probably got nothing + let request = get::fsm::start( + conn, + iroh_bytes::protocol::Request::Get(GetRequest::all(*root_hash)), + ); + // create a new bidi stream + let connected = request.next().await?; + // next step. we have requested a single hash, so this must be StartRoot + let ConnectedNext::StartRoot(start) = connected.next().await? else { + return Err(FailureAction::DropPeer(anyhow::anyhow!( + "expected StartRoot" + ))); + }; + // move to the header + let header = start.next(); + // read the blob and add it to the database + let end_root = get_blob_inner(db, header).await?; + // read the collection fully for now + let entry = db.get(root_hash).context("just downloaded").map_err(|_| { + FailureAction::RetryLater(anyhow::anyhow!("data just downloaded was not found")) + })?; + let reader = entry.data_reader().await?; + let (mut collection, _) = collection_parser.parse(0, reader).await.map_err(|_| { + FailureAction::DropPeer(anyhow::anyhow!( + "peer sent data that can't be parsed as collection" + )) + })?; + let mut children = vec![]; + while let Some(hash) = collection.next().await.map_err(|e| { + FailureAction::DropPeer(anyhow::anyhow!( + "received collection data can't be iterated: {e}" + )) + })? { + children.push(hash); + } + let mut next = end_root.next(); + // read all the children + loop { + let start = match next { + EndBlobNext::MoreChildren(start) => start, + EndBlobNext::Closing(finish) => break finish, + }; + let child_offset = usize::try_from(start.child_offset()) + .context("child offset too large") + .map_err(|_| { + FailureAction::AbortRequest(anyhow::anyhow!( + "requested offsets surpasses platform's usize" + )) + })?; + let child_hash = match children.get(child_offset) { + Some(blob) => *blob, + None => break start.finish(), + }; + let header = start.next(child_hash); + let end_blob = get_blob_inner(db, header).await?; + next = end_blob.next(); + } + }; + // this closes the bidi stream. Do something with the stats? + let stats = finishing.next().await?; + Ok(stats) +} diff --git a/iroh/src/downloader/invariants.rs b/iroh/src/downloader/invariants.rs new file mode 100644 index 0000000000..24383a4c15 --- /dev/null +++ b/iroh/src/downloader/invariants.rs @@ -0,0 +1,99 @@ +//! Invariants for the service. + +#![cfg(any(test, debug_assertions))] + +use super::*; + +/// invariants for the service. +impl, D: Dialer> Service { + /// Checks the various invariants the service must maintain + #[track_caller] + pub(in crate::downloader) fn check_invariants(&self) { + self.check_active_request_count(); + self.check_scheduled_requests_consistency(); + self.check_idle_peer_consistency(); + self.chech_concurrency_limits(); + } + + /// Checks concurrency limits are maintained. + #[track_caller] + fn chech_concurrency_limits(&self) { + let ConcurrencyLimits { + max_concurrent_requests, + max_concurrent_requests_per_peer, + max_open_connections, + } = &self.concurrency_limits; + + // check the total number of active requests to ensure it stays within the limit + assert!( + self.in_progress_downloads.len() <= *max_concurrent_requests, + "max_concurrent_requests exceeded" + ); + + // check that the open and dialing peers don't exceed the connection capacity + assert!( + self.connections_count() <= *max_open_connections, + "max_open_connections exceeded" + ); + + // check the active requests per peer don't exceed the limit + for (peer, info) in self.peers.iter() { + assert!( + info.active_requests() <= *max_concurrent_requests_per_peer, + "max_concurrent_requests_per_peer exceeded for {peer}" + ) + } + } + + /// Checks that the count of active requests per peer is consistent with the active requests, + /// and that active request are consistent with download futures + #[track_caller] + fn check_active_request_count(&self) { + // check that the count of futures we are polling for downloads is consistent with the + // number of requests + assert_eq!( + self.in_progress_downloads.len(), + self.current_requests.len(), + "current_requests and in_progress_downloads are out of sync" + ); + // check that the count of requests per peer matches the number of requests that have that + // peer as active + let mut real_count: HashMap = HashMap::with_capacity(self.peers.len()); + for req_info in self.current_requests.values() { + // nothing like some classic word count + *real_count.entry(req_info.peer).or_default() += 1; + } + for (peer, info) in self.peers.iter() { + assert_eq!( + info.active_requests(), + real_count.get(peer).copied().unwrap_or_default(), + "missmatched count of active requests for {peer}" + ) + } + } + + /// Checks that the scheduled requests match the queue that handles their delays. + #[track_caller] + fn check_scheduled_requests_consistency(&self) { + assert_eq!( + self.scheduled_requests.len(), + self.scheduled_request_queue.len(), + "scheduled_request_queue and scheduled_requests are out of sync" + ); + } + + /// Check that peers queued to be disconnected are consistent with peers considered idle. + #[track_caller] + fn check_idle_peer_consistency(&self) { + let idle_peers = self + .peers + .values() + .filter(|info| info.active_requests() == 0) + .count(); + assert_eq!( + self.goodbye_peer_queue.len(), + idle_peers, + "inconsistent count of idle peers" + ); + } +} diff --git a/iroh/src/downloader/test.rs b/iroh/src/downloader/test.rs new file mode 100644 index 0000000000..80454226fc --- /dev/null +++ b/iroh/src/downloader/test.rs @@ -0,0 +1,199 @@ +#![cfg(test)] +use std::time::Duration; + +use iroh_net::key::SecretKey; + +use super::*; + +mod dialer; +mod getter; + +impl Downloader { + fn spawn_for_test( + dialer: dialer::TestingDialer, + getter: getter::TestingGetter, + concurrency_limits: ConcurrencyLimits, + ) -> Self { + let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY); + + iroh_bytes::util::runtime::Handle::from_current(1) + .unwrap() + .local_pool() + .spawn_pinned(move || async move { + // we want to see the logs of the service + let _guard = iroh_test::logging::setup(); + + let service = Service::new(getter, dialer, concurrency_limits, msg_rx); + service.run().await + }); + + Downloader { next_id: 0, msg_tx } + } +} + +/// Tests that receiving a download request and performing it doesn't explode. +#[tokio::test] +async fn smoke_test() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send a request and make sure the peer is requested the corresponding download + let peer = SecretKey::generate().public(); + let kind = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let handle = downloader.queue(kind.clone(), vec![peer]).await; + // wait for the download result to be reported + handle.await.expect("should report success"); + // verify that the peer was dialed + dialer.assert_history(&[peer]); + // verify that the request was sent + getter.assert_history(&[(kind, peer)]); +} + +/// Tests that multiple intents produce a single request. +#[tokio::test] +async fn deduplication() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure the intents are received before completion + getter.set_request_duration(Duration::from_secs(1)); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + let peer = SecretKey::generate().public(); + let kind = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + let h = downloader.queue(kind.clone(), vec![peer]).await; + handles.push(h); + } + assert!( + futures::future::join_all(handles) + .await + .into_iter() + .all(|r| r.is_ok()), + "all downloads should succeed" + ); + // verify that the request was sent just once + getter.assert_history(&[(kind, peer)]); +} + +/// Tests that the request is cancelled only when all intents are cancelled. +#[tokio::test] +async fn cancellation() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure cancellations are received on time + getter.set_request_duration(Duration::from_millis(500)); + let concurrency_limits = ConcurrencyLimits::default(); + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + let peer = SecretKey::generate().public(); + let kind_1 = DownloadKind::Blob { + hash: Hash::new([0u8; 32]), + }; + let handle_a = downloader.queue(kind_1.clone(), vec![peer]).await; + let handle_b = downloader.queue(kind_1.clone(), vec![peer]).await; + downloader.cancel(handle_a).await; + + // create a request with two intents and cancel them both + let kind_2 = DownloadKind::Blob { + hash: Hash::new([1u8; 32]), + }; + let handle_c = downloader.queue(kind_2.clone(), vec![peer]).await; + let handle_d = downloader.queue(kind_2.clone(), vec![peer]).await; + downloader.cancel(handle_c).await; + downloader.cancel(handle_d).await; + + // wait for the download result to be reported, a was cancelled but b should continue + handle_b.await.expect("should report success"); + // verify that the request was sent just once, and that the second request was never sent + getter.assert_history(&[(kind_1, peer)]); +} + +/// Test that when the downloader receives a flood of requests, they are scheduled so that the +/// maximum number of concurrent requests is not exceed. +/// NOTE: This is internally tested by [`Service::check_invariants`]. +#[tokio::test] +async fn max_concurrent_requests() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure concurreny limits are hit + getter.set_request_duration(Duration::from_millis(500)); + // set the concurreny limit very low to ensure it's hit + let concurrency_limits = ConcurrencyLimits { + max_concurrent_requests: 2, + ..Default::default() + }; + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send the downloads + let peer = SecretKey::generate().public(); + let mut handles = Vec::with_capacity(5); + let mut expected_history = Vec::with_capacity(5); + for i in 0..5 { + let kind = DownloadKind::Blob { + hash: Hash::new([i; 32]), + }; + let h = downloader.queue(kind.clone(), vec![peer]).await; + expected_history.push((kind, peer)); + handles.push(h); + } + + assert!( + futures::future::join_all(handles) + .await + .into_iter() + .all(|r| r.is_ok()), + "all downloads should succeed" + ); + + // verify that the request was sent just once + getter.assert_history(&expected_history); +} + +/// Test that when the downloader receives a flood of requests, with only one peer to handle them, +/// the maximum number of requests per peer is still respected. +/// NOTE: This is internally tested by [`Service::check_invariants`]. +#[tokio::test] +async fn max_concurrent_requests_per_peer() { + let dialer = dialer::TestingDialer::default(); + let getter = getter::TestingGetter::default(); + // make request take some time to ensure concurreny limits are hit + getter.set_request_duration(Duration::from_millis(500)); + // set the concurreny limit very low to ensure it's hit + let concurrency_limits = ConcurrencyLimits { + max_concurrent_requests_per_peer: 1, + max_concurrent_requests: 10000, // all requests can be performed at the same time + ..Default::default() + }; + + let mut downloader = + Downloader::spawn_for_test(dialer.clone(), getter.clone(), concurrency_limits); + + // send the downloads + let peer = SecretKey::generate().public(); + let mut handles = Vec::with_capacity(5); + for i in 0..5 { + let kind = DownloadKind::Blob { + hash: Hash::new([i; 32]), + }; + let h = downloader.queue(kind.clone(), vec![peer]).await; + handles.push(h); + } + + futures::future::join_all(handles).await; +} diff --git a/iroh/src/downloader/test/dialer.rs b/iroh/src/downloader/test/dialer.rs new file mode 100644 index 0000000000..7f40dd4d01 --- /dev/null +++ b/iroh/src/downloader/test/dialer.rs @@ -0,0 +1,88 @@ +//! Implementation of [`super::Dialer`] used for testing. + +use std::{ + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use parking_lot::RwLock; + +use super::*; + +/// Dialer for testing that keeps track of the dialing history. +#[derive(Default, Clone)] +pub(super) struct TestingDialer(Arc>); + +struct TestingDialerInner { + /// Peers that are being dialed. + dialing: HashSet, + /// Queue of dials. + dial_futs: delay_queue::DelayQueue, + /// History of attempted dials. + dial_history: Vec, + /// How long does a dial last. + dial_duration: Duration, + /// Fn deciding if a dial is successful. + dial_outcome: Box bool>, +} + +impl Default for TestingDialerInner { + fn default() -> Self { + TestingDialerInner { + dialing: HashSet::default(), + dial_futs: delay_queue::DelayQueue::default(), + dial_history: Vec::default(), + dial_duration: Duration::ZERO, + dial_outcome: Box::new(|_| true), + } + } +} + +impl Dialer for TestingDialer { + type Connection = PublicKey; + + fn queue_dial(&mut self, peer_id: PublicKey) { + let mut inner = self.0.write(); + inner.dial_history.push(peer_id); + // for now assume every dial works + let dial_duration = inner.dial_duration; + if inner.dialing.insert(peer_id) { + inner.dial_futs.insert(peer_id, dial_duration); + } + } + + fn pending_count(&self) -> usize { + self.0.read().dialing.len() + } + + fn is_pending(&self, peer: &PublicKey) -> bool { + self.0.read().dialing.contains(peer) + } +} + +impl futures::Stream for TestingDialer { + type Item = (PublicKey, anyhow::Result); + + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.0.write(); + match inner.dial_futs.poll_expired(cx) { + Poll::Ready(Some(expired)) => { + let peer = expired.into_inner(); + let report_ok = (inner.dial_outcome)(&peer); + let result = report_ok + .then_some(peer) + .ok_or_else(|| anyhow::anyhow!("dialing test set to fail")); + Poll::Ready(Some((peer, result))) + } + _ => Poll::Pending, + } + } +} + +impl TestingDialer { + #[track_caller] + pub(super) fn assert_history(&self, history: &[PublicKey]) { + assert_eq!(self.0.read().dial_history, history) + } +} diff --git a/iroh/src/downloader/test/getter.rs b/iroh/src/downloader/test/getter.rs new file mode 100644 index 0000000000..7f546b2aa9 --- /dev/null +++ b/iroh/src/downloader/test/getter.rs @@ -0,0 +1,46 @@ +//! Implementation of [`super::Getter`] used for testing. + +use std::{sync::Arc, time::Duration}; + +use parking_lot::RwLock; + +use super::*; + +#[derive(Default, Clone)] +pub(super) struct TestingGetter(Arc>); + +#[derive(Default)] +struct TestingGetterInner { + /// How long requests take. + request_duration: Duration, + /// History of requests performed by the [`Getter`] and if they were successful. + request_history: Vec<(DownloadKind, PublicKey)>, +} + +impl Getter for TestingGetter { + // since for testing we don't need a real connection, just keep track of what peer is the + // request being sent to + type Connection = PublicKey; + + fn get(&mut self, kind: DownloadKind, peer: PublicKey) -> GetFut { + let mut inner = self.0.write(); + inner.request_history.push((kind, peer)); + let request_duration = inner.request_duration; + async move { + tokio::time::sleep(request_duration).await; + Ok(()) + } + .boxed_local() + } +} + +impl TestingGetter { + pub(super) fn set_request_duration(&self, request_duration: Duration) { + self.0.write().request_duration = request_duration; + } + /// Verify that the request history is as expected + #[track_caller] + pub(super) fn assert_history(&self, history: &[(DownloadKind, PublicKey)]) { + assert_eq!(self.0.read().request_history, history); + } +} diff --git a/iroh/src/get.rs b/iroh/src/get.rs index 38857c33ab..c8a3fa650f 100644 --- a/iroh/src/get.rs +++ b/iroh/src/get.rs @@ -105,7 +105,7 @@ pub async fn get_blob( anyhow::Ok(stats) } -async fn get_missing_ranges_blob( +pub(crate) async fn get_missing_ranges_blob( entry: &D::PartialEntry, ) -> anyhow::Result> { use tracing::trace as log; @@ -249,7 +249,7 @@ async fn get_blob_inner_partial( } /// Given a collection of hashes, figure out what is missing -async fn get_missing_ranges_collection( +pub(crate) async fn get_missing_ranges_collection( db: &D, collection: &Vec, ) -> io::Result>> { @@ -408,7 +408,7 @@ pub async fn get_collection( } #[derive(Debug, Clone)] -enum BlobInfo { +pub(crate) enum BlobInfo { // we have the blob completely Complete, // we have the blob partially @@ -421,7 +421,7 @@ enum BlobInfo { } impl BlobInfo { - fn missing_chunks(&self) -> RangeSet2 { + pub fn missing_chunks(&self) -> RangeSet2 { match self { BlobInfo::Complete => RangeSet2::empty(), BlobInfo::Partial { missing_chunks, .. } => missing_chunks.clone(), diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 9f12a9fecf..1697d22598 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -10,7 +10,7 @@ pub mod client; #[cfg(feature = "iroh-collection")] pub mod collection; pub mod dial; -pub mod download; +pub mod downloader; pub mod get; pub mod node; pub mod rpc_protocol; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 08b7910f73..04c6e8362c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -56,7 +56,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; use crate::dial::Ticket; -use crate::download::Downloader; +use crate::downloader::Downloader; use crate::rpc_protocol::{ BytesGetRequest, BytesGetResponse, ConnectionInfoRequest, ConnectionInfoResponse, ConnectionsRequest, ConnectionsResponse, ListBlobsRequest, ListBlobsResponse, @@ -374,7 +374,13 @@ where gossip_cell.set(gossip.clone()).unwrap(); // spawn the sync engine - let downloader = Downloader::new(rt.clone(), endpoint.clone(), self.db.clone()); + let downloader = Downloader::new( + self.db.clone(), + self.collection_parser.clone(), + endpoint.clone(), + rt.clone(), + ) + .await; let sync = SyncEngine::spawn( rt.clone(), endpoint.clone(), diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 8646ce6f82..ac99004012 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -14,7 +14,7 @@ use iroh_sync::{ }; use parking_lot::RwLock; -use crate::download::Downloader; +use crate::downloader::Downloader; mod live; pub mod rpc; diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 617ac1d99b..7fbc7e9265 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -6,7 +6,7 @@ use std::{ sync::{atomic::AtomicU64, Arc}, }; -use crate::download::Downloader; +use crate::downloader::{DownloadKind, Downloader}; use anyhow::{anyhow, bail, Result}; use futures::{ future::{BoxFuture, Shared}, @@ -673,11 +673,16 @@ impl Actor { // content. let entry_status = self.bao_store.contains(&hash); if matches!(entry_status, EntryStatus::NotFound) { - self.downloader.push(hash, vec![from]).await; - let fut = self.downloader.finished(&hash).await; - let fut = fut - .map(move |res| res.map(move |(hash, _len)| (topic, hash))) - .boxed(); + let handle = self + .downloader + .queue(DownloadKind::Blob { hash }, vec![from]) + .await; + let fut = async move { + // NOTE: this ignores the result for now, simply keeping the option + let res = handle.await.ok(); + res.map(|_| (topic, hash)) + } + .boxed(); self.pending_downloads.push(fut); }