Skip to content

Commit

Permalink
Implement it using a branch of bao-tree
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Feb 9, 2024
1 parent 7aeab36 commit 0d22f70
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 132 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ missing_debug_implementations = "warn"

[workspace.lints.clippy]
unused-async = "warn"

[patch.crates-io]
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "validate-data-ranges" }
1 change: 0 additions & 1 deletion iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime};

use crate::{BlobFormat, Hash, HashAndFormat};

pub mod bao;
pub mod io;
pub mod progress;

Expand Down
97 changes: 0 additions & 97 deletions iroh-bytes/src/util/bao.rs

This file was deleted.

2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.9.1", features = ["tokio_fsm", "validate"], default-features = false }
bytes = "1"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
Expand Down
30 changes: 22 additions & 8 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_bytes::export::ExportProgress;
use iroh_bytes::format::collection::Collection;
use iroh_bytes::protocol::{RangeSpec, RangeSpecSeq};
use iroh_bytes::provider::AddProgress;
use iroh_bytes::store::ValidateProgress;
use iroh_bytes::Hash;
Expand All @@ -37,14 +38,14 @@ use tracing::warn;
use crate::rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse,
BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse,
BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest,
DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest,
DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
BlobGetCollectionResponse, BlobGetLocalRangesRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest,
DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest,
DocExportFileRequest, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest,
DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest,
DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
Expand Down Expand Up @@ -388,6 +389,19 @@ where
Ok(stream.map_err(anyhow::Error::from))
}

/// Get the local ranges of a blob.
pub async fn get_valid_ranges(&self, hash: Hash) -> Result<RangeSpec> {
let res = self
.rpc
.rpc(BlobGetLocalRangesRequest {
hash,
ranges: RangeSpecSeq::new(Some(RangeSpec::all())),
})
.await??;
let first = res.ranges.iter().next().expect("infinite iterator");
Ok(first.clone())
}

/// Download a blob from another node and add it to the local database.
pub async fn download(&self, req: BlobDownloadRequest) -> Result<BlobDownloadProgress> {
let stream = self.rpc.server_streaming(req).await?;
Expand Down
14 changes: 14 additions & 0 deletions iroh/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub enum BlobCommands {
#[clap(long, default_value_t = false)]
repair: bool,
},
/// Get local ranges for a blob.
LocalRanges {
/// Hash of the blob to get ranges for.
#[clap(long)]
hash: Hash,
},
/// Delete content on the node.
#[clap(subcommand)]
Delete(DeleteCommands),
Expand Down Expand Up @@ -295,6 +301,14 @@ impl BlobCommands {
}
Ok(())
}
Self::LocalRanges { hash } => {
let ranges = iroh.blobs.get_valid_ranges(hash).await?;
let chunk_ranges = ranges.to_chunk_ranges();
for range in chunk_ranges.iter() {
println!("{range:?}");
}
Ok(())
}
}
}
}
Expand Down
44 changes: 22 additions & 22 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::fmt::Debug;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::ops::Range;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
Expand All @@ -18,8 +17,7 @@ use std::time::Duration;

use anyhow::{anyhow, bail, Context, Result};
use bao_tree::io::fsm::Outboard;
use bao_tree::io::outboard;
use bao_tree::{ChunkNum, ChunkRanges};
use bao_tree::ChunkRanges;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use genawaiter::sync::{Co, Gen};
Expand All @@ -34,7 +32,6 @@ use iroh_bytes::store::{
ExportMode, GcMarkEvent, GcSweepEvent, ImportProgress, Map, MapEntry, PossiblyPartialEntry,
ReadableStore, Store as BaoStore, ValidateProgress,
};
use iroh_bytes::util::bao::compute_valid_ranges;
use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender};
use iroh_bytes::{protocol::Closed, provider::AddProgress, BlobFormat, Hash, HashAndFormat};
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
Expand Down Expand Up @@ -792,8 +789,6 @@ impl<D: BaoStore> RpcHandler<D> {
}

async fn blob_list_impl(self, co: &Co<RpcResult<BlobListResponse>>) -> io::Result<()> {
use bao_tree::io::fsm::Outboard;

let db = self.inner.db.clone();
for blob in db.blobs()? {
let blob = blob?;
Expand Down Expand Up @@ -1316,21 +1311,26 @@ impl<D: BaoStore> RpcHandler<D> {
self,
msg: BlobGetLocalRangesRequest,
) -> RpcResult<BlobGetLocalRangesResponse> {
let mut result = Vec::new();
let mut iter = msg.ranges.iter();
let Some(root_query_ranges) = iter.next() else {
return Ok(BlobGetLocalRangesResponse {
ranges: RangeSpecSeq::empty(),
});
};
let root_result_ranges =
Self::get_local_ranges(&self.inner.db, msg.hash, root_query_ranges).await?;
result.push(root_result_ranges);
if !iter.is_at_end() {
return Err(anyhow::anyhow!("only root range query supported for now").into());
}
let ranges = RangeSpecSeq::new(result);
Ok(BlobGetLocalRangesResponse { ranges })
self.rt()
.spawn_pinned(move || async move {
let mut result = Vec::new();
let mut iter = msg.ranges.iter();
let Some(root_query_ranges) = iter.next() else {
return Ok(BlobGetLocalRangesResponse {
ranges: RangeSpecSeq::empty(),
});
};
let root_result_ranges =
Self::get_local_ranges(&self.inner.db, msg.hash, root_query_ranges).await?;
result.push(root_result_ranges);
if !iter.is_at_end() {
return Err(anyhow::anyhow!("only root range query supported for now").into());
}
let ranges = RangeSpecSeq::new(result);
Ok(BlobGetLocalRangesResponse { ranges })
})
.await
.expect("local task failed")
}
async fn get_local_ranges(
Expand All @@ -1345,7 +1345,7 @@ impl<D: BaoStore> RpcHandler<D> {
let outboard = entry.outboard().await?;
let mut res = ChunkRanges::empty();
let query_ranges = query_ranges.to_chunk_ranges();
let mut stream = compute_valid_ranges(data_reader, outboard, &query_ranges);
let mut stream = bao_tree::io::fsm::valid_file_ranges(outboard, data_reader, &query_ranges);
while let Some(range) = stream.next().await {
res |= ChunkRanges::from(range?);
}
Expand Down

0 comments on commit 0d22f70

Please sign in to comment.