From 22d810565e240b58c1c697c8a6eb6c668608d57f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 9 Feb 2024 11:37:55 +0200 Subject: [PATCH] Implement it using a branch of bao-tree --- Cargo.lock | 6 +-- Cargo.toml | 3 ++ iroh-bytes/src/util.rs | 1 - iroh-bytes/src/util/bao.rs | 97 -------------------------------------- iroh/Cargo.toml | 2 +- iroh/src/client.rs | 23 ++++----- iroh/src/commands/blob.rs | 14 ++++++ iroh/src/node.rs | 44 ++++++++--------- 8 files changed, 52 insertions(+), 138 deletions(-) delete mode 100644 iroh-bytes/src/util/bao.rs diff --git a/Cargo.lock b/Cargo.lock index 3725d65b50..af469fa59b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,11 +312,11 @@ dependencies = [ [[package]] name = "bao-tree" version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "155e7e0c896695a9049badd7bf2b915d29230e24dc82a7c7ef065eded072404f" +source = "git+https://github.com/n0-computer/bao-tree?branch=validate-data-ranges#8ed97d41ee47467a4d7ecc17886a33aaf024a1cb" dependencies = [ "bytes", "futures", + "genawaiter", "iroh-blake3", "iroh-io", "positioned-io", @@ -2029,7 +2029,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.51.1", + "windows-core 0.52.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 751f092c90..89103dd8b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,6 @@ missing_debug_implementations = "warn" [workspace.lints.clippy] unused-async = "warn" + +[patch.crates-io] +bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "validate-data-ranges" } diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index 716d763258..b44796607a 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -8,7 +8,6 @@ use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime}; use crate::{BlobFormat, Hash, HashAndFormat}; -pub mod bao; pub mod io; pub mod progress; diff --git a/iroh-bytes/src/util/bao.rs b/iroh-bytes/src/util/bao.rs deleted file mode 100644 index 8f5a7bd436..0000000000 --- a/iroh-bytes/src/util/bao.rs +++ /dev/null @@ -1,97 +0,0 @@ -//! Utilities that really belong into bao-tree but have not made it there yet. -use std::{io, ops::Range}; - -use bao_tree::{io::fsm::Outboard, ChunkNum, ChunkRangesRef}; -use futures::Stream; -use iroh_io::AsyncSliceReader; - -/// Given a data file and an outboard, compute the valid ranges of the data file. -pub fn compute_valid_ranges( - mut data: D, - mut outboard: O, - ranges: &ChunkRangesRef, -) -> impl Stream>> -where - D: AsyncSliceReader, - O: Outboard, -{ - futures::stream::empty() - // // buffer for writing incomplete subtrees. - // // for queries that don't have incomplete subtrees, this will never be used. - // let mut out_buf = Vec::new(); - // let mut stack = SmallVec::<[blake3::Hash; 10]>::new(); - // stack.push(outboard.root()); - // let mut encoded = encoded; - // let tree = outboard.tree(); - // let ranges = truncate_ranges(ranges, tree.size()); - // // write header - // encoded.write(tree.size.0.to_le_bytes().as_slice()).await?; - // for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) { - // match item { - // BaoChunk::Parent { - // is_root, - // left, - // right, - // node, - // .. - // } => { - // let (l_hash, r_hash) = outboard.load(node).await?.unwrap(); - // let actual = parent_cv(&l_hash, &r_hash, is_root); - // let expected = stack.pop().unwrap(); - // if actual != expected { - // return Err(EncodeError::ParentHashMismatch(node)); - // } - // if right { - // stack.push(r_hash); - // } - // if left { - // stack.push(l_hash); - // } - // let pair = combine_hash_pair(&l_hash, &r_hash); - // encoded - // .write(&pair) - // .await - // .map_err(|e| EncodeError::maybe_parent_write(e, node))?; - // } - // BaoChunk::Leaf { - // start_chunk, - // size, - // is_root, - // ranges, - // .. - // } => { - // let expected = stack.pop().unwrap(); - // let start = start_chunk.to_bytes(); - // let bytes = data.read_at(start.0, size).await?; - // let (actual, to_write) = if !ranges.is_all() { - // // we need to encode just a part of the data - // // - // // write into an out buffer to ensure we detect mismatches - // // before writing to the output. - // out_buf.clear(); - // let actual = encode_selected_rec( - // start_chunk, - // &bytes, - // is_root, - // ranges, - // tree.block_size.to_u32(), - // true, - // &mut out_buf, - // ); - // (actual, &out_buf[..]) - // } else { - // let actual = hash_subtree(start_chunk.0, &bytes, is_root); - // (actual, &bytes[..]) - // }; - // if actual != expected { - // return Err(EncodeError::LeafHashMismatch(start_chunk)); - // } - // encoded - // .write(to_write) - // .await - // .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?; - // } - // } - // } - // Ok(()) -} diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 3c75d45431..ea0e028152 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -18,7 +18,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false } +bao-tree = { version = "0.9.1", features = ["tokio_fsm", "validate"], default-features = false } bytes = "1" data-encoding = "2.4.0" derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 661dd8c62c..9106986863 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -17,6 +17,7 @@ use futures::stream::BoxStream; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; use iroh_bytes::export::ExportProgress; use iroh_bytes::format::collection::Collection; +use iroh_bytes::protocol::{RangeSpec, RangeSpecSeq}; use iroh_bytes::provider::AddProgress; use iroh_bytes::store::ValidateProgress; use iroh_bytes::Hash; @@ -35,20 +36,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; use crate::rpc_protocol::{ - AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest, - BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, - BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, - BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, - DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest, - DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest, - DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, - DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, - DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, - ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, - NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, + AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobGetLocalRangesRequest, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption }; use crate::sync_engine::SyncEvent; @@ -388,6 +376,13 @@ where Ok(stream.map_err(anyhow::Error::from)) } + /// Get the local ranges of a blob. + pub async fn get_valid_ranges(&self, hash: Hash) -> Result { + let res = self.rpc.rpc(BlobGetLocalRangesRequest { hash, ranges: RangeSpecSeq::new(Some(RangeSpec::all())) }).await??; + let first = res.ranges.iter().next().expect("infinite iterator"); + Ok(first.clone()) + } + /// Download a blob from another node and add it to the local database. pub async fn download(&self, req: BlobDownloadRequest) -> Result { let stream = self.rpc.server_streaming(req).await?; diff --git a/iroh/src/commands/blob.rs b/iroh/src/commands/blob.rs index 8a4a34326a..fca5dcfc49 100644 --- a/iroh/src/commands/blob.rs +++ b/iroh/src/commands/blob.rs @@ -91,6 +91,12 @@ pub enum BlobCommands { #[clap(long, default_value_t = false)] repair: bool, }, + /// Get local ranges for a blob. + LocalRanges { + /// Hash of the blob to get ranges for. + #[clap(long)] + hash: Hash, + }, /// Delete content on the node. #[clap(subcommand)] Delete(DeleteCommands), @@ -295,6 +301,14 @@ impl BlobCommands { } Ok(()) } + Self::LocalRanges { hash } => { + let ranges = iroh.blobs.get_valid_ranges(hash).await?; + let chunk_ranges = ranges.to_chunk_ranges(); + for range in chunk_ranges.iter() { + println!("{range:?}"); + } + Ok(()) + } } } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index b32f41a102..6371364c92 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -9,7 +9,6 @@ use std::fmt::Debug; use std::future::Future; use std::io; use std::net::SocketAddr; -use std::ops::Range; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -18,8 +17,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use bao_tree::io::fsm::Outboard; -use bao_tree::io::outboard; -use bao_tree::{ChunkNum, ChunkRanges}; +use bao_tree::ChunkRanges; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; use genawaiter::sync::{Co, Gen}; @@ -34,7 +32,6 @@ use iroh_bytes::store::{ ExportMode, GcMarkEvent, GcSweepEvent, ImportProgress, Map, MapEntry, PossiblyPartialEntry, ReadableStore, Store as BaoStore, ValidateProgress, }; -use iroh_bytes::util::bao::compute_valid_ranges; use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender}; use iroh_bytes::{protocol::Closed, provider::AddProgress, BlobFormat, Hash, HashAndFormat}; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; @@ -792,8 +789,6 @@ impl RpcHandler { } async fn blob_list_impl(self, co: &Co>) -> io::Result<()> { - use bao_tree::io::fsm::Outboard; - let db = self.inner.db.clone(); for blob in db.blobs()? { let blob = blob?; @@ -1316,21 +1311,26 @@ impl RpcHandler { self, msg: BlobGetLocalRangesRequest, ) -> RpcResult { - let mut result = Vec::new(); - let mut iter = msg.ranges.iter(); - let Some(root_query_ranges) = iter.next() else { - return Ok(BlobGetLocalRangesResponse { - ranges: RangeSpecSeq::empty(), - }); - }; - let root_result_ranges = - Self::get_local_ranges(&self.inner.db, msg.hash, root_query_ranges).await?; - result.push(root_result_ranges); - if !iter.is_at_end() { - return Err(anyhow::anyhow!("only root range query supported for now").into()); - } - let ranges = RangeSpecSeq::new(result); - Ok(BlobGetLocalRangesResponse { ranges }) + self.rt() + .spawn_pinned(move || async move { + let mut result = Vec::new(); + let mut iter = msg.ranges.iter(); + let Some(root_query_ranges) = iter.next() else { + return Ok(BlobGetLocalRangesResponse { + ranges: RangeSpecSeq::empty(), + }); + }; + let root_result_ranges = + Self::get_local_ranges(&self.inner.db, msg.hash, root_query_ranges).await?; + result.push(root_result_ranges); + if !iter.is_at_end() { + return Err(anyhow::anyhow!("only root range query supported for now").into()); + } + let ranges = RangeSpecSeq::new(result); + Ok(BlobGetLocalRangesResponse { ranges }) + }) + .await + .expect("local task failed") } async fn get_local_ranges( @@ -1345,7 +1345,7 @@ impl RpcHandler { let outboard = entry.outboard().await?; let mut res = ChunkRanges::empty(); let query_ranges = query_ranges.to_chunk_ranges(); - let mut stream = compute_valid_ranges(data_reader, outboard, &query_ranges); + let mut stream = bao_tree::io::fsm::valid_file_ranges(outboard, data_reader, &query_ranges); while let Some(range) = stream.next().await { res |= ChunkRanges::from(range?); }