Skip to content

Commit f00cf15

Browse files
authored
Merge 51f5707 into 2337e46
2 parents 2337e46 + 51f5707 commit f00cf15

File tree

1 file changed

+105
-2
lines changed

1 file changed

+105
-2
lines changed

src/store/traits.rs

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Durati
33

44
pub use bao_tree;
55
use bao_tree::{
6-
io::fsm::{BaoContentItem, Outboard},
6+
io::{
7+
fsm::{
8+
encode_ranges_validated, BaoContentItem, Outboard, ResponseDecoder, ResponseDecoderNext,
9+
},
10+
DecodeError,
11+
},
712
BaoTree, ChunkRanges,
813
};
914
use bytes::Bytes;
1015
use futures_lite::{Stream, StreamExt};
1116
use genawaiter::rc::{Co, Gen};
12-
use iroh_io::AsyncSliceReader;
17+
use iroh_io::{AsyncSliceReader, AsyncStreamReader, AsyncStreamWriter};
1318
pub use range_collections;
1419
use serde::{Deserialize, Serialize};
1520
use tokio::io::AsyncRead;
@@ -90,6 +95,31 @@ pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
9095
fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
9196
/// A future that resolves to a reader that can be used to read the data
9297
fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
98+
99+
/// Encodes data and outboard into a [`AsyncStreamWriter`].
100+
///
101+
/// Data and outboard parts will be interleaved.
102+
///
103+
/// `offset` is the byte offset in the blob to start the stream from. It will be rounded down to
104+
/// the next chunk group.
105+
///
106+
/// Returns immediately without error if `start` is equal or larger than the entry's size.
107+
fn write_verifiable_stream<'a>(
108+
&'a self,
109+
offset: u64,
110+
writer: impl AsyncStreamWriter + 'a,
111+
) -> impl Future<Output = io::Result<()>> + 'a {
112+
async move {
113+
let size = self.size().value();
114+
if offset >= size {
115+
return Ok(());
116+
}
117+
let ranges = range_from_offset_and_length(offset, size - offset);
118+
let (outboard, data) = tokio::try_join!(self.outboard(), self.data_reader())?;
119+
encode_ranges_validated(data, outboard, &ranges, writer).await?;
120+
Ok(())
121+
}
122+
}
93123
}
94124

95125
/// A generic map from hashes to bao blobs (blobs with bao outboards).
@@ -341,6 +371,74 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
341371
self.import_stream(stream, format, progress)
342372
}
343373

374+
/// Import a blob from a verified stream, as emitted by [`MapEntry::write_verifiable_stream`];
375+
///
376+
/// `total_size` is the total size of the blob as reported by the remote.
377+
/// `offset` is the byte offset in the blob where the stream starts. It will be rounded
378+
/// to the next chunk group.
379+
fn import_verifiable_stream<'a>(
380+
&'a self,
381+
hash: Hash,
382+
total_size: u64,
383+
offset: u64,
384+
reader: impl AsyncStreamReader + 'a,
385+
) -> impl Future<Output = io::Result<()>> + 'a {
386+
async move {
387+
if offset >= total_size {
388+
return Err(io::Error::new(
389+
io::ErrorKind::InvalidInput,
390+
"offset must not be greater than total_size",
391+
));
392+
}
393+
let entry = self.get_or_create(hash, total_size).await?;
394+
let mut bw = entry.batch_writer().await?;
395+
396+
let ranges = range_from_offset_and_length(offset, total_size - offset);
397+
let mut decoder = ResponseDecoder::new(
398+
hash.into(),
399+
ranges,
400+
BaoTree::new(total_size, IROH_BLOCK_SIZE),
401+
reader,
402+
);
403+
let size = decoder.tree().size();
404+
let mut buf = Vec::new();
405+
let is_complete = loop {
406+
decoder = match decoder.next().await {
407+
ResponseDecoderNext::More((decoder, item)) => {
408+
let item = match item {
409+
Err(DecodeError::LeafNotFound(_) | DecodeError::ParentNotFound(_)) => {
410+
break false
411+
}
412+
Err(err) => return Err(err.into()),
413+
Ok(item) => item,
414+
};
415+
match &item {
416+
BaoContentItem::Parent(_) => {
417+
buf.push(item);
418+
}
419+
BaoContentItem::Leaf(_) => {
420+
buf.push(item);
421+
let batch = std::mem::take(&mut buf);
422+
bw.write_batch(size, batch).await?;
423+
}
424+
}
425+
decoder
426+
}
427+
ResponseDecoderNext::Done(_reader) => {
428+
debug_assert!(buf.is_empty(), "last node of bao tree must be leaf node");
429+
break true;
430+
}
431+
};
432+
};
433+
bw.sync().await?;
434+
drop(bw);
435+
if is_complete {
436+
self.insert_complete(entry).await?;
437+
}
438+
Ok(())
439+
}
440+
}
441+
344442
/// Set a tag
345443
fn set_tag(
346444
&self,
@@ -386,6 +484,11 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
386484
}
387485
}
388486

487+
fn range_from_offset_and_length(offset: u64, length: u64) -> bao_tree::ChunkRanges {
488+
let ranges = bao_tree::ByteRanges::from(offset..(offset + length));
489+
bao_tree::io::round_up_to_chunks(&ranges)
490+
}
491+
389492
async fn validate_impl(
390493
store: &impl Store,
391494
repair: bool,

0 commit comments

Comments
 (0)