Skip to content

Commit

Permalink
fix(iroh): do not download if content already exists
Browse files Browse the repository at this point in the history
Avoid downloading content in `blobs.download` if content is already locally available
  • Loading branch information
dignifiedquire committed Jan 19, 2024
1 parent c3abd8f commit 02f04d9
Showing 1 changed file with 35 additions and 26 deletions.
61 changes: 35 additions & 26 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use iroh_base::rpc::RpcResult;
use iroh_bytes::format::collection::Collection;
use iroh_bytes::get::db::DownloadProgress;
use iroh_bytes::get::{db::DownloadProgress, Stats};
use iroh_bytes::hashseq::parse_hash_seq;
use iroh_bytes::store::{
ExportMode, GcMarkEvent, GcSweepEvent, ImportProgress, Map, MapEntry, PossiblyPartialEntry,
ReadableStore, Store as BaoStore, ValidateProgress,
EntryStatus, ExportMode, GcMarkEvent, GcSweepEvent, ImportProgress, Map, MapEntry,
PossiblyPartialEntry, ReadableStore, Store as BaoStore, ValidateProgress,
};
use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender};
use iroh_bytes::{protocol::Closed, provider::AddProgress, BlobFormat, Hash, HashAndFormat};
Expand Down Expand Up @@ -1116,33 +1116,42 @@ impl<D: BaoStore> RpcHandler<D> {
let format = msg.format;
let db = self.inner.db.clone();
let haf = HashAndFormat { hash, format };
let temp_pin = db.temp_tag(haf);
let conn = self
.inner
.endpoint
.connect(msg.peer, iroh_bytes::protocol::ALPN)
.await?;
progress.send(DownloadProgress::Connected).await?;
let progress2 = progress.clone();

let progress3 = progress.clone();
let db = self.inner.db.clone();
let db2 = db.clone();
let download = local.spawn_pinned(move || async move {
iroh_bytes::get::db::get_to_db(
&db2,
conn,
&HashAndFormat {
hash: msg.hash,
format: msg.format,
},
progress2,
)
.await
});
let (stats, temp_pin) = if EntryStatus::Complete != self.inner.db.entry_status(&hash) {
// only download if this entry doesn't already exists
let temp_pin = db.temp_tag(haf);
let conn = self
.inner
.endpoint
.connect(msg.peer, iroh_bytes::protocol::ALPN)
.await?;
progress.send(DownloadProgress::Connected).await?;
let progress2 = progress.clone();

let db = self.inner.db.clone();
let db2 = db.clone();
let stats = local
.spawn_pinned(move || async move {
iroh_bytes::get::db::get_to_db(
&db2,
conn,
&HashAndFormat {
hash: msg.hash,
format: msg.format,
},
progress2,
)
.await
})
.await??;
(stats, Some(temp_pin))
} else {
(Stats::default(), None)
};

let this = self.clone();
let _export = local.spawn_pinned(move || async move {
let stats = download.await.unwrap()?;
progress
.send(DownloadProgress::NetworkDone {
bytes_written: stats.bytes_written,
Expand Down

0 comments on commit 02f04d9

Please sign in to comment.