Skip to content

Commit

Permalink
refactor(iroh-bytes): Update bao-tree to 0.12 and adjust code (#2153)
Browse files Browse the repository at this point in the history
## Description

refactor(iroh-bytes): Update bao-tree to 0.12 and adjust code

bao-tree has some API changes that need to be adapted. Outboard creation
is easier, and the ByteNum newtype is gone, replaced by just u64.

## Notes & open questions

~~There is still a test failure. Hopefully nothing major. But hence the
draft status.~~

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn committed Apr 8, 2024
1 parent 13e83f3 commit bfb7560
Show file tree
Hide file tree
Showing 19 changed files with 136 additions and 187 deletions.
19 changes: 16 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.11.1", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
bao-tree = { version = "0.12", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
data-encoding = { version = "2.3.3", optional = true }
hex = "0.4.3"
multibase = { version = "0.9.1", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.11.1", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
Expand Down
102 changes: 46 additions & 56 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,12 @@ pub mod fsm {
use super::*;

use bao_tree::{
io::{
fsm::{
OutboardMut, ResponseDecoderReading, ResponseDecoderReadingNext,
ResponseDecoderStart,
},
StartDecodeError,
},
ChunkRanges, TreeNode,
io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
BaoTree, ChunkRanges, TreeNode,
};
use derive_more::From;
use iroh_io::AsyncSliceWriter;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

self_cell::self_cell! {
struct RangesIterInner {
Expand Down Expand Up @@ -331,15 +325,11 @@ pub mod fsm {
///
/// This requires passing in the hash of the child for validation
pub fn next(self, hash: Hash) -> AtBlobHeader {
let stream = ResponseDecoderStart::<TrackingReader<RecvStream>>::new(
hash.into(),
self.ranges,
IROH_BLOCK_SIZE,
self.reader,
);
AtBlobHeader {
stream,
reader: self.reader,
ranges: self.ranges,
misc: self.misc,
hash,
}
}

Expand Down Expand Up @@ -368,14 +358,10 @@ pub mod fsm {
///
/// For the collection we already know the hash, since it was part of the request
pub fn next(self) -> AtBlobHeader {
let stream = ResponseDecoderStart::new(
self.hash.into(),
self.ranges,
IROH_BLOCK_SIZE,
self.reader,
);
AtBlobHeader {
stream,
reader: self.reader,
ranges: self.ranges,
hash: self.hash,
misc: self.misc,
}
}
Expand All @@ -389,8 +375,10 @@ pub mod fsm {
/// State before reading a size header
#[derive(Debug)]
pub struct AtBlobHeader {
stream: ResponseDecoderStart<TrackingReader<RecvStream>>,
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
misc: Box<Misc>,
hash: Hash,
}

/// Error that you can get from [`AtBlobHeader::next`]
Expand Down Expand Up @@ -423,30 +411,32 @@ pub mod fsm {

impl AtBlobHeader {
/// Read the size header, returning it and going into the `Content` state.
pub async fn next(self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
match self.stream.next().await {
Ok((stream, size)) => Ok((
AtBlobContent {
stream,
misc: self.misc,
},
size,
)),
Err(cause) => Err(match cause {
StartDecodeError::NotFound => AtBlobHeaderNextError::NotFound,
StartDecodeError::Io(cause) => {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::ReadError>() {
AtBlobHeaderNextError::Read(e.clone())
} else {
AtBlobHeaderNextError::Io(cause)
}
} else {
AtBlobHeaderNextError::Io(cause)
}
}
}),
}
pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
let size = self.reader.read_u64_le().await.map_err(|cause| {
if cause.kind() == io::ErrorKind::UnexpectedEof {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
.get_ref()
.and_then(|x| x.downcast_ref::<quinn::ReadError>())
{
AtBlobHeaderNextError::Read(e.clone())
} else {
AtBlobHeaderNextError::Io(cause)
}
})?;
let stream = ResponseDecoder::new(
self.hash.into(),
self.ranges,
BaoTree::new(size, IROH_BLOCK_SIZE),
self.reader,
);
Ok((
AtBlobContent {
stream,
misc: self.misc,
},
size,
))
}

/// Drain the response and throw away the result
Expand Down Expand Up @@ -506,12 +496,12 @@ pub mod fsm {

/// The hash of the blob we are reading.
pub fn hash(&self) -> Hash {
(*self.stream.hash()).into()
self.hash
}

/// The ranges we have requested for the current hash.
pub fn ranges(&self) -> &ChunkRanges {
self.stream.ranges()
&self.ranges
}

/// The current offset of the blob we are reading.
Expand All @@ -523,7 +513,7 @@ pub mod fsm {
/// State while we are reading content
#[derive(Debug)]
pub struct AtBlobContent {
stream: ResponseDecoderReading<TrackingReader<RecvStream>>,
stream: ResponseDecoder<TrackingReader<RecvStream>>,
misc: Box<Misc>,
}

Expand Down Expand Up @@ -634,12 +624,12 @@ pub mod fsm {
/// Read the next item, either content, an error, or the end of the blob
pub async fn next(self) -> BlobContentNext {
match self.stream.next().await {
ResponseDecoderReadingNext::More((stream, res)) => {
ResponseDecoderNext::More((stream, res)) => {
let next = Self { stream, ..self };
let res = res.map_err(DecodeError::from);
BlobContentNext::More((next, res))
}
ResponseDecoderReadingNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
stream,
misc: self.misc,
}),
Expand Down Expand Up @@ -708,7 +698,7 @@ pub mod fsm {
let mut writer = writer;
let mut buf = Vec::new();
let mut content = self;
let size = content.tree().size().0;
let size = content.tree().size();
loop {
match content.next().await {
BlobContentNext::More((next, item)) => {
Expand Down Expand Up @@ -758,7 +748,7 @@ pub mod fsm {
}
}
BaoContentItem::Leaf(leaf) => {
data.write_bytes_at(leaf.offset.0, leaf.data).await?;
data.write_bytes_at(leaf.offset, leaf.data).await?;
}
}
}
Expand All @@ -782,7 +772,7 @@ pub mod fsm {
match item? {
BaoContentItem::Parent(_) => {}
BaoContentItem::Leaf(leaf) => {
data.write_bytes_at(leaf.offset.0, leaf.data).await?;
data.write_bytes_at(leaf.offset, leaf.data).await?;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Functions that use the iroh-bytes protocol in conjunction with a bao store.

use bao_tree::ChunkNum;
use futures::{Future, StreamExt};
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
Expand All @@ -26,7 +26,7 @@ use crate::{
BlobFormat, HashAndFormat,
};
use anyhow::anyhow;
use bao_tree::{ByteNum, ChunkRanges};
use bao_tree::ChunkRanges;
use iroh_io::AsyncSliceReader;
use tracing::trace;

Expand Down Expand Up @@ -146,7 +146,7 @@ pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<Chun
// compute the valid range from just looking at the data file
let mut data_reader = entry.data_reader().await?;
let data_size = data_reader.len().await?;
let valid_from_data = ChunkRanges::from(..ByteNum(data_size).full_chunks());
let valid_from_data = ChunkRanges::from(..ChunkNum::full_chunks(data_size));
// compute the valid range from just looking at the outboard file
let mut outboard = entry.outboard().await?;
let all = ChunkRanges::all();
Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/get/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
protocol::{GetRequest, RangeSpecSeq},
Hash, HashAndFormat,
};
use bao_tree::{ByteNum, ChunkNum, ChunkRanges};
use bao_tree::{ChunkNum, ChunkRanges};
use bytes::Bytes;
use rand::Rng;

Expand Down Expand Up @@ -179,14 +179,14 @@ pub async fn get_chunk_probe(
pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
let total_chunks = sizes
.iter()
.map(|size| ByteNum(*size).full_chunks().0)
.map(|size| ChunkNum::full_chunks(*size).0)
.sum::<u64>();
let random_chunk = rng.gen_range(0..total_chunks);
let mut remaining = random_chunk;
let mut ranges = vec![];
ranges.push(ChunkRanges::empty());
for size in sizes.iter() {
let chunks = ByteNum(*size).full_chunks().0;
let chunks = ChunkNum::full_chunks(*size).0;
if remaining < chunks {
ranges.push(ChunkRanges::from(
ChunkNum(remaining)..ChunkNum(remaining + 1),
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use bao_tree::BlockSize;

/// Block size used by iroh, 2^4*1024 = 16KiB
pub const IROH_BLOCK_SIZE: BlockSize = BlockSize(4);
pub const IROH_BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
2 changes: 1 addition & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
match db.get(&name).await? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size().0;
let size = outboard.tree().size();
let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?);
let res = encode_ranges_validated(
&mut file_reader,
Expand Down
Loading

0 comments on commit bfb7560

Please sign in to comment.