Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip range download #2015

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ missing_debug_implementations = "warn"

[workspace.lints.clippy]
unused-async = "warn"

[patch.crates-io]
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "validate-data-ranges" }
134 changes: 134 additions & 0 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use bao_tree::{ByteNum, ChunkRanges};
use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
use tracing::trace;

use super::fsm::AtBlobContent;

/// Get a blob or collection into a store.
///
/// This considers data that is already in the store, and will only request
Expand Down Expand Up @@ -603,3 +605,135 @@ impl From<ExportProgress> for DownloadProgress {
}
}
}

/// Get a blob that was requested completely.
///
/// We need to create our own files and handle the case where an outboard
/// is not needed.
pub async fn get_ranges_to_db<D: BaoStore>(
db: &D,
conn: quinn::Connection,
root: &Hash,
ranges: RangeSpecSeq,
progress: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> Result<Stats, GetError> {
let end = match db.get_possibly_partial(root)? {
PossiblyPartialEntry::Complete(entry) => {
tracing::info!("already got entire blob");
progress
.send(DownloadProgress::FoundLocal {
child: 0,
hash: *root,
size: entry.size(),
valid_ranges: RangeSpec::all(),
})
.await?;
return Ok(Stats::default());
}
PossiblyPartialEntry::Partial(entry) => {
let request = GetRequest::new(*root, ranges);
// request just the selected ranges
let request = get::fsm::start(conn, request);
// create a new bidi stream
let connected = request.next().await?;
// next step. we have requested a single hash, so this must be StartRoot
let ConnectedNext::StartRoot(start) = connected.next().await? else {
return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot")));
};
// move to the header
let at_header = start.next();
// read the size
let (at_content, size) = at_header.next().await?;
// do the ceremony of getting the blob and adding it to the database
get_blob_inner_keep_partial(size, at_content, entry, progress).await?
}
PossiblyPartialEntry::NotFound => {
let request = GetRequest::new(*root, ranges);
// request just the selected ranges
let request = get::fsm::start(conn, request);
// create a new bidi stream
let connected = request.next().await?;
// next step. we have requested a single hash, so this must be StartRoot
let ConnectedNext::StartRoot(start) = connected.next().await? else {
return Err(GetError::NoncompliantNode(anyhow!("expected StartRoot")));
};
// move to the header
let at_header = start.next();
// read the size
let (at_content, size) = at_header.next().await?;
// create the temp file pair
let entry = db.get_or_create_partial(*root, size)?;
// do the ceremony of getting the blob and adding it to the database
get_blob_inner_keep_partial(size, at_content, entry, progress).await?
}
};

// we have requested a single hash, so we must be at closing
let EndBlobNext::Closing(end) = end.next() else {
return Err(GetError::NoncompliantNode(anyhow!("expected AtClosing")));
};
// this closes the bidi stream. Do something with the stats?
let stats = end.next().await?;
Ok(stats)
}

/// Get a blob that was requested partially.
///
/// We get passed the data and outboard ids. Partial downloads are only done
/// for large blobs where the outboard is present.
async fn get_blob_inner_keep_partial<D: BaoStore>(
size: u64,
at_content: AtBlobContent,
entry: impl PartialMapEntry<D>,
sender: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> Result<AtEndBlob, GetError> {
// TODO: the data we get is validated at this point, but we need to check
// that it actually contains the requested ranges. Or DO WE?

// open the data file in any case
let df = entry.data_writer().await?;
let mut of = if needs_outboard(size) {
Some(entry.outboard_mut().await?)
} else {
None
};
// allocate a new id for progress reports for this transfer
let id = sender.new_id();
let hash = at_content.hash();
let child_offset = at_content.offset();
sender
.send(DownloadProgress::Found {
id,
hash,
size,
child: child_offset,
})
.await?;
let sender2 = sender.clone();
let on_write = move |offset: u64, _length: usize| {
// if try send fails it means that the receiver has been dropped.
// in that case we want to abort the write_all_with_outboard.
sender2
.try_send(DownloadProgress::Progress { id, offset })
.map_err(|e| {
tracing::info!("aborting download of {}", hash);
e
})?;
Ok(())
};
let mut pw = FallibleProgressSliceWriter::new(df, on_write);
// use the convenience method to write all to the two vfs objects
let at_end = at_content
.write_all_with_outboard(of.as_mut(), &mut pw)
.await?;
// sync the data file
pw.sync().await?;
// sync the outboard file
if let Some(mut of) = of {
of.sync().await?;
}
// KEY DIFFERENCE: we do not know if the file is final, so we do not convert it to a complete entry
// notify that we are done
sender.send(DownloadProgress::Done { id }).await?;
Ok(at_end)
}
23 changes: 18 additions & 5 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,17 @@ impl MapEntry<Store> for PartialEntry {

fn outboard(&self) -> BoxFuture<'_, io::Result<<Store as Map>::Outboard>> {
async move {
let file = File::open(self.outboard_path.clone()).await?;
let data = if needs_outboard(self.size) {
let file = File::open(self.outboard_path.clone()).await?;
MemOrFile::File(file)
} else {
let fake = self.size.to_le_bytes().to_vec().into();
MemOrFile::Mem(fake)
};
Ok(PreOrderOutboard {
root: self.hash.into(),
tree: BaoTree::new(ByteNum(self.size), IROH_BLOCK_SIZE),
data: MemOrFile::File(file),
data,
})
}
.boxed()
Expand Down Expand Up @@ -596,8 +602,15 @@ impl Map for Store {
Ok(if let Some(entry) = state.complete.get(hash) {
state.get_entry(hash, entry, &self.0.options)
} else if let Some(entry) = state.partial.get(hash) {
let data_path = self.0.options.partial_data_path(*hash, &entry.uuid);
let outboard_path = self.0.options.partial_outboard_path(*hash, &entry.uuid);
let data_path: PathBuf = self.0.options.partial_data_path(*hash, &entry.uuid);
let outboard = if needs_outboard(entry.size) {
let outboard_path = self.0.options.partial_outboard_path(*hash, &entry.uuid);
Either::Right(outboard_path)
} else {
let fake = entry.size.to_le_bytes().to_vec().into();
Either::Left(fake)
};

tracing::trace!(
"got partial: {} {} {}",
hash,
Expand All @@ -609,7 +622,7 @@ impl Map for Store {
is_complete: false,
entry: EntryData {
data: Either::Right((data_path, entry.size)),
outboard: Either::Right(outboard_path),
outboard,
},
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.9.1", features = ["tokio_fsm", "validate"], default-features = false }
bytes = "1"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
Expand Down
52 changes: 39 additions & 13 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_bytes::export::ExportProgress;
use iroh_bytes::format::collection::Collection;
use iroh_bytes::protocol::{RangeSpec, RangeSpecSeq};
use iroh_bytes::provider::AddProgress;
use iroh_bytes::store::ValidateProgress;
use iroh_bytes::Hash;
Expand All @@ -36,19 +37,20 @@ use tracing::warn;

use crate::rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse,
BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse,
BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest,
DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest,
DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRangesRequest, BlobDownloadRequest,
BlobGetCollectionRequest, BlobGetCollectionResponse, BlobGetLocalRangesRequest,
BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest,
BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadAtRequest,
BlobReadAtResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest,
CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest,
DocDelResponse, DocDropRequest, DocExportFileRequest, DocGetDownloadPolicyRequest,
DocGetExactRequest, DocGetManyRequest, DocImportFileRequest, DocImportProgress,
DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest,
DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest,
DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, NodeStatusResponse, ProviderService,
SetTagOption, ShareMode, WrapOption,
};
use crate::sync_engine::SyncEvent;

Expand Down Expand Up @@ -388,6 +390,19 @@ where
Ok(stream.map_err(anyhow::Error::from))
}

/// Get the local ranges of a blob.
pub async fn get_valid_ranges(&self, hash: Hash) -> Result<RangeSpec> {
let res = self
.rpc
.rpc(BlobGetLocalRangesRequest {
hash,
ranges: RangeSpecSeq::new(Some(RangeSpec::all())),
})
.await??;
let first = res.ranges.iter().next().expect("infinite iterator");
Ok(first.clone())
}

/// Download a blob from another node and add it to the local database.
pub async fn download(&self, req: BlobDownloadRequest) -> Result<BlobDownloadProgress> {
let stream = self.rpc.server_streaming(req).await?;
Expand All @@ -396,6 +411,17 @@ where
))
}

/// Download ranges of a blob from another node and add it to the local database.
pub async fn download_ranges(
&self,
req: BlobDownloadRangesRequest,
) -> Result<BlobDownloadProgress> {
let stream = self.rpc.server_streaming(req).await?;
Ok(BlobDownloadProgress::new(
stream.map_err(anyhow::Error::from),
))
}

/// List all complete blobs.
pub async fn list(&self) -> Result<impl Stream<Item = Result<BlobListResponse>>> {
let stream = self.rpc.server_streaming(BlobListRequest).await?;
Expand Down
Loading
Loading