diff --git a/Cargo.lock b/Cargo.lock index 511959d..c704d5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,29 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "abao" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2daa0989489b05a455a9707adbbbc17443edf7bbc902ce499cd3b84148d68a40" -dependencies = [ - "arrayref", - "arrayvec", - "blake3", - "futures", - "tokio", -] - -[[package]] -name = "abao" -version = "0.2.0" -source = "git+https://github.com/n0-computer/abao?branch=post-order-outboard#9675e0426badc06fe728aea72d02892c91f5caa1" -dependencies = [ - "arrayref", - "arrayvec", - "blake3", -] - [[package]] name = "addr2line" version = "0.21.0" @@ -177,7 +154,6 @@ dependencies = [ name = "bao-tree" version = "0.11.1" dependencies = [ - "abao 0.2.0 (git+https://github.com/n0-computer/abao?branch=post-order-outboard)", "anyhow", "bao", "bytes", @@ -395,7 +371,6 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" name = "cli" version = "0.1.0" dependencies = [ - "abao 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "anyhow", "bao", "bao-tree", diff --git a/Cargo.toml b/Cargo.toml index 722def5..9c1994b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,6 @@ default = ["tokio_fsm", "validate"] hex = "0.4.3" bao = "0.12.1" tokio = { version = "1", features = ["full"] } -# abao with chunk group size 16 (abao default) -abao = { git = "https://github.com/n0-computer/abao", branch = "post-order-outboard", features = ["group_size_1k"], default_features = false } proptest = "1.0.0" rand = "0.8.5" criterion = "0.4.0" @@ -55,6 +53,9 @@ harness = false [workspace] members = ["cli"] +[package.metadata.docs.rs] +all-features = true + [[example]] name = "cli" required-features = ["tokio_fsm"] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 76f4a2b..18932a7 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -10,7 +10,6 @@ bao-tree = { path = "../" } clap = { version = "4", features = ["derive"] } anyhow = "1.0.72" bao = "0.12.1" -abao = "0.2.0" [[bin]] name = "bao-tree" diff --git a/cli/main.rs b/cli/main.rs index 0fc1191..b3ca7dd 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -1,7 +1,7 @@ use anyhow::Context; -use bao_tree::BlockSize; +use bao_tree::{BaoTree, BlockSize, ByteNum}; use clap::{Parser, Subcommand}; -use std::path::PathBuf; +use std::{io::Write, path::PathBuf}; #[derive(Parser, Debug, Clone)] #[clap(version)] @@ -47,9 +47,11 @@ fn main() -> anyhow::Result<()> { let source = std::fs::File::open(&path)?; let target = std::fs::File::create(out)?; let source = std::io::BufReader::with_capacity(1024 * 1024 * 16, source); - let target = std::io::BufWriter::with_capacity(1024 * 1024 * 16, target); + let mut target = std::io::BufWriter::with_capacity(1024 * 1024 * 16, target); let t0 = std::time::Instant::now(); - let hash = bao_tree::io::sync::outboard_post_order(source, size, bs, target)?; + let tree = BaoTree::new(ByteNum(size), bs); + let hash = bao_tree::io::sync::outboard_post_order(source, tree, &mut target)?; + target.write_all(size.to_le_bytes().as_ref())?; let dt = t0.elapsed(); let rate = size as f64 / dt.as_secs_f64(); println!("{}", hash); diff --git a/examples/cli.rs b/examples/cli.rs index 49baf6c..b020c42 100644 --- a/examples/cli.rs +++ b/examples/cli.rs @@ -179,16 +179,16 @@ fn parse_ranges(ranges: Vec) -> anyhow::Result> { } mod sync { - use std::io::{self, Cursor, Write}; + use std::io::{self, Cursor, Read, Write}; use bao_tree::{ io::{ outboard::PreOrderMemOutboard, round_up_to_chunks, - sync::{encode_ranges_validated, DecodeResponseItem, DecodeResponseIter, Outboard}, - Header, Leaf, Parent, + sync::{encode_ranges_validated, DecodeResponseIter, Outboard}, + BaoContentItem, Leaf, Parent, }, - BlockSize, ChunkRanges, + BaoTree, BlockSize, ByteNum, ChunkRanges, }; use positioned_io::WriteAt; @@ -211,16 +211,17 @@ mod sync { block_size: BlockSize, v: bool, ) -> io::Result<()> { - let iter = - DecodeResponseIter::new(msg.hash, block_size, Cursor::new(&msg.encoded), &msg.ranges); + let mut reader = Cursor::new(&msg.encoded); + let mut size = [0; 8]; + reader.read_exact(&mut size)?; + let size = ByteNum(u64::from_le_bytes(size)); + let tree = BaoTree::new(size, block_size); + let iter = DecodeResponseIter::new(msg.hash, tree, Cursor::new(&msg.encoded), &msg.ranges); let mut indent = 0; + target.set_len(size.0)?; for response in iter { match response? { - DecodeResponseItem::Header(Header { size }) => { - log!(v, "got header claiming a size of {}", size); - target.set_len(size.0)?; - } - DecodeResponseItem::Parent(Parent { node, pair: (l, r) }) => { + BaoContentItem::Parent(Parent { node, pair: (l, r) }) => { indent = indent.max(node.level() + 1); let prefix = " ".repeat((indent - node.level()) as usize); log!( @@ -233,7 +234,7 @@ mod sync { r.to_hex() ); } - DecodeResponseItem::Leaf(Leaf { offset, data }) => { + BaoContentItem::Leaf(Leaf { offset, data }) => { let prefix = " ".repeat(indent as usize); log!( v, @@ -250,15 +251,16 @@ mod sync { } fn decode_to_stdout(msg: &Message, block_size: BlockSize, v: bool) -> io::Result<()> { - let iter = - DecodeResponseIter::new(msg.hash, block_size, Cursor::new(&msg.encoded), &msg.ranges); + let mut reader = Cursor::new(&msg.encoded); + let mut size = [0; 8]; + reader.read_exact(&mut size)?; + let size = ByteNum(u64::from_le_bytes(size)); + let tree = BaoTree::new(size, block_size); + let iter = DecodeResponseIter::new(msg.hash, tree, Cursor::new(&msg.encoded), &msg.ranges); let mut indent = 0; for response in iter { match response? { - DecodeResponseItem::Header(Header { size }) => { - log!(v, "got header claiming a size of {}", size); - } - DecodeResponseItem::Parent(Parent { node, pair: (l, r) }) => { + BaoContentItem::Parent(Parent { node, pair: (l, r) }) => { indent = indent.max(node.level() + 1); let prefix = " ".repeat((indent - node.level()) as usize); log!( @@ -271,7 +273,7 @@ mod sync { r.to_hex() ); } - DecodeResponseItem::Leaf(Leaf { offset, data }) => { + BaoContentItem::Leaf(Leaf { offset, data }) => { let prefix = " ".repeat(indent as usize); log!( v, @@ -340,9 +342,11 @@ mod sync { } mod fsm { - use bao_tree::io::fsm::{ - encode_ranges_validated, BaoContentItem, Outboard, ResponseDecoderReadingNext, - ResponseDecoderStart, + use bao_tree::io::{ + fsm::{ + encode_ranges_validated, Outboard, ResponseDecoderReadingNext, ResponseDecoderStart, + }, + BaoContentItem, }; use iroh_io::AsyncSliceWriter; use tokio::io::AsyncWriteExt; diff --git a/src/io/fsm.rs b/src/io/fsm.rs index 99e65d7..7d1e2f3 100644 --- a/src/io/fsm.rs +++ b/src/io/fsm.rs @@ -5,6 +5,9 @@ //! //! This makes them occasionally a bit verbose to use, but allows being generic //! without having to box the futures. +//! +//! The traits to perform async positioned io are re-exported from +//! [iroh-io](https://crates.io/crates/iroh-io). use std::{future::Future, io, result}; use crate::{ @@ -17,7 +20,7 @@ use blake3::guts::parent_cv; use bytes::{Bytes, BytesMut}; use iroh_io::AsyncStreamWriter; use smallvec::SmallVec; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use crate::{ io::{ @@ -30,30 +33,7 @@ use crate::{ }; pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter}; -use super::{combine_hash_pair, DecodeError, StartDecodeError}; - -/// An item of bao content -/// -/// We know that we are not going to get headers after the first item. -#[derive(Debug)] -pub enum BaoContentItem { - /// a parent node, to update the outboard - Parent(Parent), - /// a leaf node, to write to the file - Leaf(Leaf), -} - -impl From for BaoContentItem { - fn from(p: Parent) -> Self { - Self::Parent(p) - } -} - -impl From for BaoContentItem { - fn from(l: Leaf) -> Self { - Self::Leaf(l) - } -} +use super::{combine_hash_pair, BaoContentItem, DecodeError, StartDecodeError}; /// A binary merkle tree for blake3 hashes of a blob. /// @@ -96,7 +76,7 @@ pub trait Outboard { /// A mutable outboard. /// -/// This trait extends [Outboard] with methods to save a hash pair for a node and to set the +/// This trait provides a way to save a hash pair for a node and to set the /// length of the data file. /// /// This trait can be used to incrementally save an outboard when receiving data. @@ -141,7 +121,7 @@ impl Outboard for PreOrderOutboard { let Some(offset) = self.tree.pre_order_offset(node) else { return Ok(None); }; - let offset = offset * 64 + 8; + let offset = offset * 64; let content = self.data.read_at(offset, 64).await?; Ok(Some(if content.len() != 64 { (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32])) @@ -174,7 +154,7 @@ impl OutboardMut for PreOrderOutboard { let Some(offset) = self.tree.pre_order_offset(node) else { return Ok(()); }; - let offset = offset * 64 + 8; + let offset = offset * 64; let mut buf = [0u8; 64]; buf[..32].copy_from_slice(hash_pair.0.as_bytes()); buf[32..].copy_from_slice(hash_pair.1.as_bytes()); @@ -575,25 +555,19 @@ where /// /// If you do not want to update an outboard, use [super::outboard::EmptyOutboard] as /// the outboard. -pub async fn decode_response_into( - root: blake3::Hash, - block_size: BlockSize, - ranges: ChunkRanges, +pub async fn decode_ranges( encoded: R, - create: F, + ranges: ChunkRanges, mut target: W, -) -> io::Result> + mut outboard: O, +) -> io::Result<()> where - O: OutboardMut, + O: OutboardMut + Outboard, R: AsyncRead + Unpin, W: AsyncSliceWriter, - F: FnOnce(blake3::Hash, BaoTree) -> Fut, - Fut: Future>, { - let start = ResponseDecoderStart::new(root, ranges, block_size, encoded); - let (mut reading, _size) = start.next().await?; - let mut outboard = None; - let mut create = Some(create); + let mut reading = + ResponseDecoderReading::new(outboard.root(), ranges, outboard.tree(), encoded); loop { let item = match reading.next().await { ResponseDecoderReadingNext::Done(_reader) => break, @@ -604,15 +578,6 @@ where }; match item { BaoContentItem::Parent(Parent { node, pair }) => { - let outboard = if let Some(outboard) = outboard.as_mut() { - outboard - } else { - let tree = reading.tree(); - let create = create.take().unwrap(); - let new = create(root, tree).await?; - outboard = Some(new); - outboard.as_mut().unwrap() - }; outboard.save(node, &pair).await?; } BaoContentItem::Leaf(Leaf { offset, data }) => { @@ -620,7 +585,7 @@ where } } } - Ok(outboard) + Ok(()) } fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) { let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap()); @@ -628,18 +593,136 @@ fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) { (l_hash, r_hash) } +/// Compute the outboard for the given data. +/// +/// Unlike [outboard_post_order], this will work with any outboard +/// implementation, but it is not guaranteed that writes are sequential. +pub async fn outboard( + data: impl AsyncRead + Unpin, + tree: BaoTree, + mut outboard: impl OutboardMut, +) -> io::Result { + let mut buffer = vec![0u8; tree.chunk_group_bytes().to_usize()]; + let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?; + Ok(hash) +} + +/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size. +async fn outboard_impl( + tree: BaoTree, + mut data: impl AsyncRead + Unpin, + mut outboard: impl OutboardMut, + buffer: &mut [u8], +) -> io::Result { + // do not allocate for small trees + let mut stack = SmallVec::<[blake3::Hash; 10]>::new(); + debug_assert!(buffer.len() == tree.chunk_group_bytes().to_usize()); + for item in tree.post_order_chunks_iter() { + match item { + BaoChunk::Parent { is_root, node, .. } => { + let right_hash = stack.pop().unwrap(); + let left_hash = stack.pop().unwrap(); + outboard.save(node, &(left_hash, right_hash)).await?; + let parent = parent_cv(&left_hash, &right_hash, is_root); + stack.push(parent); + } + BaoChunk::Leaf { + size, + is_root, + start_chunk, + .. + } => { + let buf = &mut buffer[..size]; + data.read_exact(buf).await?; + let hash = hash_subtree(start_chunk.0, buf, is_root); + stack.push(hash); + } + } + } + debug_assert_eq!(stack.len(), 1); + let hash = stack.pop().unwrap(); + Ok(hash) +} + +/// Compute the post order outboard for the given data, writing into a io::Write +/// +/// For the post order outboard, writes to the target are sequential. +/// +/// This will not add the size to the output. You need to store it somewhere else +/// or append it yourself. +pub async fn outboard_post_order( + data: impl AsyncRead + Unpin, + tree: BaoTree, + mut outboard: impl AsyncWrite + Unpin, +) -> io::Result { + let mut buffer = vec![0u8; tree.chunk_group_bytes().to_usize()]; + let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?; + Ok(hash) +} + +/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size. +async fn outboard_post_order_impl( + tree: BaoTree, + mut data: impl AsyncRead + Unpin, + mut outboard: impl AsyncWrite + Unpin, + buffer: &mut [u8], +) -> io::Result { + // do not allocate for small trees + let mut stack = SmallVec::<[blake3::Hash; 10]>::new(); + debug_assert!(buffer.len() == tree.chunk_group_bytes().to_usize()); + for item in tree.post_order_chunks_iter() { + match item { + BaoChunk::Parent { is_root, .. } => { + let right_hash = stack.pop().unwrap(); + let left_hash = stack.pop().unwrap(); + outboard.write_all(left_hash.as_bytes()).await?; + outboard.write_all(right_hash.as_bytes()).await?; + let parent = parent_cv(&left_hash, &right_hash, is_root); + stack.push(parent); + } + BaoChunk::Leaf { + size, + is_root, + start_chunk, + .. + } => { + let buf = &mut buffer[..size]; + data.read_exact(buf).await?; + let hash = hash_subtree(start_chunk.0, buf, is_root); + stack.push(hash); + } + } + } + debug_assert_eq!(stack.len(), 1); + let hash = stack.pop().unwrap(); + Ok(hash) +} + +/// Copy an outboard to another outboard. +/// +/// This can be used to persist an in memory outboard or to change from +/// pre-order to post-order. +pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> { + let tree = from.tree(); + for node in tree.pre_order_nodes_iter() { + if let Some(hash_pair) = from.load(node).await? { + to.save(node, &hash_pair).await?; + } + } + Ok(()) +} + #[cfg(feature = "validate")] mod validate { - use std::{future::Future, io, ops::Range, pin::Pin}; + use std::{io, ops::Range}; use futures_lite::{FutureExt, Stream}; use genawaiter::sync::{Co, Gen}; use iroh_io::AsyncSliceReader; - type LocalBoxFuture<'a, T> = Pin + 'a>>; use crate::{ - blake3, hash_subtree, rec::truncate_ranges, split, BaoTree, ChunkNum, ChunkRangesRef, - TreeNode, + blake3, hash_subtree, io::LocalBoxFuture, rec::truncate_ranges, split, BaoTree, ByteNum, + ChunkNum, ChunkRangesRef, TreeNode, }; use super::Outboard; @@ -709,11 +792,10 @@ mod validate { async fn yield_if_valid( &mut self, - node: TreeNode, + range: Range, hash: &blake3::Hash, is_root: bool, ) -> io::Result<()> { - let range = self.tree.byte_range(node); let len = (range.end - range.start).to_usize(); let data = self.data.read_at(range.start.0, len).await?; // is_root is always false because the case of a single chunk group is handled before calling this function @@ -740,8 +822,9 @@ mod validate { return Ok(()); } let node = shifted.subtract_block_size(self.tree.block_size.0); + let (l, m, r) = self.tree.leaf_byte_ranges3(node); if !self.tree.is_relevant_for_outboard(node) { - self.yield_if_valid(node, parent_hash, is_root).await?; + self.yield_if_valid(l..r, parent_hash, is_root).await?; return Ok(()); } let Some((l_hash, r_hash)) = self.outboard.load(node).await? else { @@ -753,26 +836,20 @@ mod validate { // hash mismatch, we can't validate return Ok(()); }; - if node.is_leaf() { - self.yield_if_valid(node, parent_hash, is_root).await?; - } else { - let (l_ranges, r_ranges) = split(ranges, node); - if shifted.is_leaf() { - if !l_ranges.is_empty() { - let l: TreeNode = node.left_child().unwrap(); - self.yield_if_valid(l, &l_hash, false).await?; - } - if !r_ranges.is_empty() { - let r = node.right_descendant(self.tree.filled_size()).unwrap(); - self.yield_if_valid(r, &r_hash, false).await?; - } - } else { - // recurse (we are in the domain of the shifted tree) - let left = shifted.left_child().unwrap(); - self.validate_rec(&l_hash, left, false, l_ranges).await?; - let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); - self.validate_rec(&r_hash, right, false, r_ranges).await?; + let (l_ranges, r_ranges) = split(ranges, node); + if shifted.is_leaf() { + if !l_ranges.is_empty() { + self.yield_if_valid(l..m, &l_hash, false).await?; + } + if !r_ranges.is_empty() { + self.yield_if_valid(m..r, &r_hash, false).await?; } + } else { + // recurse (we are in the domain of the shifted tree) + let left = shifted.left_child().unwrap(); + self.validate_rec(&l_hash, left, false, l_ranges).await?; + let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); + self.validate_rec(&r_hash, right, false, r_ranges).await?; } Ok(()) } @@ -838,8 +915,7 @@ mod validate { ranges: &'b ChunkRangesRef, ) -> LocalBoxFuture<'b, io::Result<()>> { Box::pin(async move { - let yield_node_range = |node| { - let range = self.tree.byte_range(node); + let yield_node_range = |range: Range| { self.co .yield_(Ok(range.start.full_chunks()..range.end.chunks())) }; @@ -848,8 +924,9 @@ mod validate { return Ok(()); } let node = shifted.subtract_block_size(self.tree.block_size.0); + let (l, m, r) = self.tree.leaf_byte_ranges3(node); if !self.tree.is_relevant_for_outboard(node) { - yield_node_range(node).await; + yield_node_range(l..r).await; return Ok(()); } let Some((l_hash, r_hash)) = self.outboard.load(node).await? else { @@ -861,26 +938,20 @@ mod validate { // hash mismatch, we can't validate return Ok(()); }; - if node.is_leaf() { - yield_node_range(node).await; - } else { - let (l_ranges, r_ranges) = split(ranges, node); - if shifted.is_leaf() { - if !l_ranges.is_empty() { - let l = node.left_child().unwrap(); - yield_node_range(l).await; - } - if !r_ranges.is_empty() { - let r = node.right_descendant(self.tree.filled_size()).unwrap(); - yield_node_range(r).await; - } - } else { - // recurse (we are in the domain of the shifted tree) - let left = shifted.left_child().unwrap(); - self.validate_rec(&l_hash, left, false, l_ranges).await?; - let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); - self.validate_rec(&r_hash, right, false, r_ranges).await?; + let (l_ranges, r_ranges) = split(ranges, node); + if shifted.is_leaf() { + if !l_ranges.is_empty() { + yield_node_range(l..m).await; } + if !r_ranges.is_empty() { + yield_node_range(m..r).await; + } + } else { + // recurse (we are in the domain of the shifted tree) + let left = shifted.left_child().unwrap(); + self.validate_rec(&l_hash, left, false, l_ranges).await?; + let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); + self.validate_rec(&r_hash, right, false, r_ranges).await?; } Ok(()) }) diff --git a/src/io/mod.rs b/src/io/mod.rs index 84812d0..eed749b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,10 +1,13 @@ //! Implementation of bao streaming for std io and tokio io +use std::pin::Pin; + use crate::{blake3, BaoTree, BlockSize, ByteNum, ChunkNum, ChunkRanges, TreeNode}; use bytes::Bytes; mod error; pub use error::*; use range_collections::{range_set::RangeSetRange, RangeSetRef}; +use std::future::Future; use self::outboard::PostOrderMemOutboard; #[cfg(feature = "tokio_fsm")] @@ -40,20 +43,41 @@ pub struct Leaf { pub data: Bytes, } -/// The outboard size of a file of size `size` with a block size of `block_size` -pub fn outboard_size(size: u64, block_size: BlockSize) -> u64 { - BaoTree::outboard_size(ByteNum(size), block_size).0 +/// A content item for the bao streaming protocol. +/// +/// After reading the initial header, the only possible items are `Parent` and +/// `Leaf`. +#[derive(Debug)] +pub enum BaoContentItem { + /// a parent node, to update the outboard + Parent(Parent), + /// a leaf node, to write to the file + Leaf(Leaf), } -/// The encoded size of a file of size `size` with a block size of `block_size` -pub fn encoded_size(size: u64, block_size: BlockSize) -> u64 { - outboard_size(size, block_size) + size +impl From for BaoContentItem { + fn from(p: Parent) -> Self { + Self::Parent(p) + } +} + +impl From for BaoContentItem { + fn from(l: Leaf) -> Self { + Self::Leaf(l) + } +} + +/// The outboard size of a file of size `size` with a block size of `block_size` +/// +/// This is the outboard size *without* the size prefix. +pub fn outboard_size(size: u64, block_size: BlockSize) -> u64 { + BaoTree::new(ByteNum(size), block_size).outboard_size().0 } /// Computes the pre order outboard of a file in memory. pub fn outboard(input: impl AsRef<[u8]>, block_size: BlockSize) -> (Vec, blake3::Hash) { let outboard = PostOrderMemOutboard::create(input, block_size).flip(); - let hash = *outboard.hash(); + let hash = outboard.root; (outboard.into_inner_with_prefix(), hash) } @@ -79,8 +103,7 @@ pub fn round_up_to_chunks(ranges: &RangeSetRef) -> ChunkRanges { res } -/// Given a range set of byte ranges, round it up to chunk groups -/// Given a range set of chunk ranges, return the full chunk groups. +/// Given a range set of byte ranges, round it up to chunk groups. /// /// If we store outboard data at a level of granularity of `block_size`, we can only /// share full chunk groups because we don't have proofs for anything below a chunk group. @@ -119,3 +142,5 @@ pub(crate) fn combine_hash_pair(l: &blake3::Hash, r: &blake3::Hash) -> [u8; 64] *rb = *r.as_bytes(); res } + +pub(crate) type LocalBoxFuture<'a, T> = Pin + 'a>>; diff --git a/src/io/outboard.rs b/src/io/outboard.rs index 142aaed..abcacfe 100644 --- a/src/io/outboard.rs +++ b/src/io/outboard.rs @@ -3,10 +3,8 @@ //! A number of implementations for the sync and async outboard traits are provided. //! Implementations for in-memory outboards, for outboards where the data resides on disk, //! and a special implementation [EmptyOutboard] that just ignores all writes. - -use super::{sync::write_outboard_from_mem, TreeNode}; -use crate::{blake3, BaoTree, BlockSize, ByteNum}; -use std::{fmt, io}; +use crate::{blake3, BaoTree, BlockSize, ByteNum, TreeNode}; +use std::io; /// An empty outboard, that just returns 0 hashes for all nodes. /// @@ -95,6 +93,9 @@ impl crate::io::fsm::OutboardMut for EmptyOutboard { } /// A generic outboard in pre order +/// +/// Caution: unlike the outboard implementation in the bao crate, this +/// implementation does not assume an 8 byte size prefix. #[derive(Debug, Clone)] pub struct PreOrderOutboard { /// root hash @@ -105,72 +106,48 @@ pub struct PreOrderOutboard { pub data: R, } -impl PreOrderOutboard { - /// Return the inner reader - pub fn into_inner(self) -> R { - self.data - } -} - /// A generic outboard in post order #[derive(Debug, Clone)] pub struct PostOrderOutboard { /// root hash - pub(crate) root: blake3::Hash, + pub root: blake3::Hash, /// tree defining the data - pub(crate) tree: BaoTree, + pub tree: BaoTree, /// hashes with length prefix - pub(crate) data: R, -} - -impl PostOrderOutboard { - /// Return the inner reader - pub fn into_inner(self) -> R { - self.data - } + pub data: R, } /// A post order outboard that is optimized for memory storage. -#[derive(Clone, PartialEq, Eq)] +/// +/// The traits are implemented for fixed size slices or mutable slices, so you +/// must make sure that the data is already the right size. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PostOrderMemOutboard> { /// root hash - pub(crate) root: blake3::Hash, + pub root: blake3::Hash, /// tree defining the data - pub(crate) tree: BaoTree, + pub tree: BaoTree, /// hashes without length suffix pub data: T, } -impl> fmt::Debug for PostOrderMemOutboard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let pairs = self - .data - .as_ref() - .chunks_exact(64) - .map(|chunk| parse_hash_pair(chunk.try_into().unwrap())) - .collect::>(); - f.debug_struct("PostOrderMemOutboard") - .field("root", &self.root) - .field("tree", &self.tree) - .field("data", &pairs) - .finish() - } -} - impl PostOrderMemOutboard { /// Create a new outboard from `data` and a `block_size`. /// - /// This will hash the data and create an outboard + /// This will hash the data and create an outboard. + /// + /// It is just a shortcut that calls [crate::io::sync::outboard_post_order]. pub fn create(data: impl AsRef<[u8]>, block_size: BlockSize) -> Self { let data = data.as_ref(); - let tree = BaoTree::new(ByteNum(data.len() as u64), block_size); - let outboard_len: usize = (tree.outboard_hash_pairs() * 64).try_into().unwrap(); - let outboard_data = vec![0; outboard_len]; - let root = blake3::Hash::from_bytes([0; 32]); - let mut outboard = Self::new(root, tree, outboard_data).unwrap(); - let root = write_outboard_from_mem(data, &mut outboard).unwrap(); - outboard.root = root; - outboard + let size = data.len() as u64; + let tree = BaoTree::new(ByteNum(size), block_size); + let mut outboard = Vec::with_capacity(tree.outboard_size().to_usize()); + let root = crate::io::sync::outboard_post_order(data, tree, &mut outboard).unwrap(); + Self { + root, + tree, + data: outboard, + } } /// returns the outboard data, with the length suffix. @@ -184,56 +161,39 @@ impl PostOrderMemOutboard { impl> PostOrderMemOutboard { /// Create a new outboard from a root hash, tree, and existing outboard data. /// - /// This will just do a check that the data is the right size, but not check - /// the actual hashes. - pub fn new( - root: blake3::Hash, - tree: BaoTree, - outboard_data: T, - ) -> std::result::Result { - if outboard_data.as_ref().len() as u64 == tree.outboard_hash_pairs() * 64 { - Ok(Self { - root, - tree, - data: outboard_data, - }) - } else { - Err("invalid outboard data size") + /// Note that when writing to a [PreOrderMemOutboard], you must make sure + /// that the data is already the right size. The size can be computed with + /// [BaoTree::outboard_size]. + pub fn new(root: blake3::Hash, tree: BaoTree, outboard_data: T) -> Self { + Self { + root, + tree, + data: outboard_data, } } - /// Get the inner data. - pub fn data(&self) -> &T { - &self.data - } - /// Map the outboard data to a new type. - pub fn map_data(self, f: F) -> std::result::Result, &'static str> + pub fn map_data(self, f: F) -> PostOrderMemOutboard where F: FnOnce(T) -> U, U: AsRef<[u8]>, { - let len = self.data.as_ref().len(); - let data = f(self.data); - if data.as_ref().len() == len { - Ok(PostOrderMemOutboard { - root: self.root, - tree: self.tree, - data, - }) - } else { - Err("invalid outboard data size") + PostOrderMemOutboard { + root: self.root, + tree: self.tree, + data: f(self.data), } } - /// The outboard data, without the length suffix. - pub fn outboard(&self) -> &[u8] { - self.data.as_ref() - } - /// Flip the outboard to pre order. pub fn flip(&self) -> PreOrderMemOutboard { - flip_post(self.root, self.tree, self.data.as_ref()) + let mut target = PreOrderMemOutboard::new( + self.root, + self.tree, + vec![0; self.tree.outboard_size().to_usize()], + ); + crate::io::sync::copy(self, &mut target).unwrap(); + target } } @@ -318,50 +278,20 @@ fn load_post(tree: &BaoTree, data: &[u8], node: TreeNode) -> Option<(blake3::Has load_raw_post_mem(tree, data, node).map(parse_hash_pair) } -fn flip_post(root: blake3::Hash, tree: BaoTree, data: &[u8]) -> PreOrderMemOutboard { - let mut out = vec![0; data.len()]; - for node in tree.post_order_nodes_iter() { - if let Some((l, r)) = load_post(&tree, data, node) { - let offset = tree.pre_order_offset(node).unwrap(); - let offset = (offset as usize) * 64; - out[offset..offset + 32].copy_from_slice(l.as_bytes()); - out[offset + 32..offset + 64].copy_from_slice(r.as_bytes()); - } - } - PreOrderMemOutboard { - root, - tree, - data: out, - } -} - /// A pre order outboard that is optimized for memory storage. -#[derive(Clone, PartialEq, Eq)] +/// +/// The traits are implemented for fixed size slices or mutable slices, so you +/// must make sure that the data is already the right size. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PreOrderMemOutboard> { /// root hash - pub(crate) root: blake3::Hash, + pub root: blake3::Hash, /// tree defining the data - pub(crate) tree: BaoTree, + pub tree: BaoTree, /// hashes with length prefix pub data: T, } -impl> fmt::Debug for PreOrderMemOutboard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let pairs = self - .data - .as_ref() - .chunks_exact(64) - .map(|chunk| parse_hash_pair(chunk.try_into().unwrap())) - .collect::>(); - f.debug_struct("PreOrderMemOutboard") - .field("root", &self.root) - .field("tree", &self.tree) - .field("data", &pairs) - .finish() - } -} - impl PreOrderMemOutboard { /// returns the outboard data, with the length prefix added. pub fn into_inner_with_prefix(self) -> Vec { @@ -375,77 +305,56 @@ impl PreOrderMemOutboard { /// This will hash the data and create an outboard pub fn create(data: impl AsRef<[u8]>, block_size: BlockSize) -> Self { let data = data.as_ref(); - let tree = BaoTree::new(ByteNum(data.len() as u64), block_size); - let outboard_len: usize = (tree.outboard_hash_pairs() * 64).try_into().unwrap(); - let outboard_data = vec![0u8; outboard_len]; - let root = blake3::Hash::from_bytes([0; 32]); - let mut outboard = Self::new(root, tree, outboard_data).unwrap(); - let root = write_outboard_from_mem(data, &mut outboard).unwrap(); - outboard.root = root; - outboard + let size = data.len() as u64; + let tree = BaoTree::new(ByteNum(size), block_size); + // the outboard impl for PreOrderMemOutboard requires just AsMut<[u8]>, + // so the data must already be the right size. + let outboard = vec![0u8; tree.outboard_size().to_usize()]; + let mut res = Self::new(blake3::Hash::from([0; 32]), tree, outboard); + let root = crate::io::sync::outboard(data, tree, &mut res).unwrap(); + res.root = root; + res } } impl> PreOrderMemOutboard { /// Create a new outboard from a root hash, tree, and existing outboard data. /// - /// This will just do a check that the data is the right size, but not check - /// the actual hashes. + /// Note that when writing to a [PreOrderMemOutboard], you must make sure + /// that the data is already the right size. The size can be computed with + /// [BaoTree::outboard_size]. /// /// Note that if you have data with a length prefix, you have to remove the prefix first. - pub fn new( - root: blake3::Hash, - tree: BaoTree, - outboard_data: T, - ) -> std::result::Result { - if outboard_data.as_ref().len() as u64 == tree.outboard_hash_pairs() * 64 { - Ok(Self { - root, - tree, - data: outboard_data, - }) - } else { - Err("invalid outboard data size") + pub fn new(root: blake3::Hash, tree: BaoTree, outboard_data: T) -> Self { + Self { + root, + tree, + data: outboard_data, } } /// Map the outboard data to a new type. - pub fn map_data(self, f: F) -> std::result::Result, &'static str> + pub fn map_data(self, f: F) -> PreOrderMemOutboard where F: FnOnce(T) -> U, U: AsRef<[u8]>, { - let len = self.data.as_ref().len(); - let data = f(self.data); - if data.as_ref().len() == len { - Ok(PreOrderMemOutboard { - root: self.root, - tree: self.tree, - data, - }) - } else { - Err("invalid outboard data size") + PreOrderMemOutboard { + root: self.root, + tree: self.tree, + data: f(self.data), } } - /// The outboard data, including the length prefix. - pub fn outboard(&self) -> &[u8] { - self.data.as_ref() - } - - /// The root hash. - pub fn hash(&self) -> &blake3::Hash { - &self.root - } - - /// Get the inner data. - pub fn into_inner(self) -> T { - self.data - } - /// Flip the outboard to a post order outboard. pub fn flip(&self) -> PostOrderMemOutboard { - flip_pre(self.root, self.tree, self.data.as_ref()) + let mut target = PostOrderMemOutboard::new( + self.root, + self.tree, + vec![0; self.tree.outboard_size().to_usize()], + ); + crate::io::sync::copy(self, &mut target).unwrap(); + target } } @@ -481,7 +390,7 @@ impl> crate::io::sync::OutboardMut for PreOrderMemOutboard { } #[cfg(feature = "tokio_fsm")] -impl + 'static> crate::io::fsm::Outboard for PreOrderMemOutboard { +impl> crate::io::fsm::Outboard for PreOrderMemOutboard { fn root(&self) -> blake3::Hash { self.root } @@ -536,23 +445,6 @@ fn load_pre(tree: &BaoTree, data: &[u8], node: TreeNode) -> Option<(blake3::Hash load_raw_pre_mem(tree, data, node).map(parse_hash_pair) } -fn flip_pre(root: blake3::Hash, tree: BaoTree, data: &[u8]) -> PostOrderMemOutboard { - let mut out = vec![0; data.len()]; - for node in tree.post_order_nodes_iter() { - if let Some((l, r)) = load_pre(&tree, data, node) { - let offset = tree.post_order_offset(node).unwrap().value(); - let offset = usize::try_from(offset * 64).unwrap(); - out[offset..offset + 32].copy_from_slice(l.as_bytes()); - out[offset + 32..offset + 64].copy_from_slice(r.as_bytes()); - } - } - PostOrderMemOutboard { - root, - tree, - data: out, - } -} - pub(crate) fn parse_hash_pair(buf: [u8; 64]) -> (blake3::Hash, blake3::Hash) { let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap()); let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap()); diff --git a/src/io/sync.rs b/src/io/sync.rs index c3838f5..c18b940 100644 --- a/src/io/sync.rs +++ b/src/io/sync.rs @@ -1,7 +1,9 @@ -//! Syncronous IO +//! Sync IO operations +//! +//! The traits to perform positioned io are re-exported from +//! [positioned-io](https://crates.io/crates/positioned-io). use std::{ io::{self, Read, Write}, - ops::Range, result, }; @@ -9,60 +11,21 @@ use crate::{ blake3, io::error::{AnyDecodeError, EncodeError}, io::{ - outboard::{parse_hash_pair, PostOrderMemOutboard, PostOrderOutboard, PreOrderOutboard}, - Header, Leaf, Parent, + outboard::{parse_hash_pair, PostOrderOutboard, PreOrderOutboard}, + Leaf, Parent, }, iter::BaoChunk, rec::{encode_selected_rec, truncate_ranges}, - BaoTree, BlockSize, ByteNum, ChunkRanges, ChunkRangesRef, TreeNode, + BaoTree, ChunkRangesRef, TreeNode, }; use blake3::guts::parent_cv; use bytes::BytesMut; pub use positioned_io::{ReadAt, Size, WriteAt}; -use range_collections::{range_set::RangeSetRange, RangeSetRef}; use smallvec::SmallVec; -use super::{combine_hash_pair, outboard::PreOrderMemOutboard, DecodeError, StartDecodeError}; +use super::{combine_hash_pair, BaoContentItem, DecodeError}; use crate::{hash_subtree, iter::ResponseIterRef}; -macro_rules! io_error { - ($($arg:tt)*) => { - return Err(io::Error::new(io::ErrorKind::InvalidInput, format!($($arg)*))) - }; -} - -/// An item of a decode response -#[derive(Debug)] -pub enum DecodeResponseItem { - /// We got the header and now know how big the overall size is - /// - /// Actually this is just how big the remote side *claims* the overall size is. - /// In an adversarial setting, this could be wrong. - Header(Header), - /// a parent node, to update the outboard - Parent(Parent), - /// a leaf node, to write to the file - Leaf(Leaf), -} - -impl From
for DecodeResponseItem { - fn from(h: Header) -> Self { - Self::Header(h) - } -} - -impl From for DecodeResponseItem { - fn from(p: Parent) -> Self { - Self::Parent(p) - } -} - -impl From for DecodeResponseItem { - fn from(l: Leaf) -> Self { - Self::Leaf(l) - } -} - /// A binary merkle tree for blake3 hashes of a blob. /// /// This trait contains information about the geometry of the tree, the root hash, @@ -91,7 +54,7 @@ pub trait Outboard { /// A mutable outboard. /// -/// This trait extends [Outboard] with methods to save a hash pair for a node and to set the +/// This trait provides a way to save a hash pair for a node and to set the /// length of the data file. /// /// This trait can be used to incrementally save an outboard when receiving data. @@ -134,22 +97,8 @@ impl Outboard for &mut O { impl PreOrderOutboard { /// Create a new outboard from a reader, root hash, and block size. - pub fn new(root: blake3::Hash, block_size: BlockSize, data: R) -> io::Result { - let mut content = [0u8; 8]; - data.read_exact_at(0, &mut content)?; - let len = ByteNum(u64::from_le_bytes(content[0..8].try_into().unwrap())); - let tree = BaoTree::new(len, block_size); - let expected_outboard_size = super::outboard_size(len.0, block_size); - let size = data.size()?; - if size != Some(expected_outboard_size) { - io_error!( - "Expected outboard size of {} bytes, but got {} bytes", - expected_outboard_size, - size.map(|s| s.to_string()).unwrap_or("unknown".to_string()) - ); - } - // zero pad the rest, if needed. - Ok(Self { root, tree, data }) + pub fn new(root: blake3::Hash, tree: BaoTree, data: R) -> Self { + Self { root, tree, data } } } @@ -166,7 +115,7 @@ impl Outboard for PreOrderOutboard { let Some(offset) = self.tree.pre_order_offset(node) else { return Ok(None); }; - let offset = offset * 64 + 8; + let offset = offset * 64; let mut content = [0u8; 64]; self.data.read_exact_at(offset, &mut content)?; Ok(Some(parse_hash_pair(content))) @@ -178,7 +127,7 @@ impl OutboardMut for PreOrderOutboard { let Some(offset) = self.tree.pre_order_offset(node) else { return Ok(()); }; - let offset = offset * 64 + 8; + let offset = offset * 64; let mut content = [0u8; 64]; content[0..32].copy_from_slice(hash_pair.0.as_bytes()); content[32..64].copy_from_slice(hash_pair.1.as_bytes()); @@ -189,27 +138,8 @@ impl OutboardMut for PreOrderOutboard { impl PostOrderOutboard { /// Create a new outboard from a reader, root hash, and block size. - pub fn new(root: blake3::Hash, block_size: BlockSize, data: R) -> io::Result { - // validate roughly that the outboard is correct - let Some(outboard_size) = data.size()? else { - io_error!("outboard must have a known size"); - }; - if outboard_size < 8 { - io_error!("outboard is too short"); - }; - let mut suffix = [0u8; 8]; - data.read_exact_at(outboard_size - 8, &mut suffix)?; - let len = u64::from_le_bytes(suffix); - let expected_outboard_size = super::outboard_size(len, block_size); - if outboard_size != expected_outboard_size { - io_error!( - "Expected outboard size of {} bytes, but got {} bytes", - expected_outboard_size, - outboard_size - ); - } - let tree = BaoTree::new(ByteNum(len), block_size); - Ok(Self { root, tree, data }) + pub fn new(root: blake3::Hash, tree: BaoTree, data: R) -> Self { + Self { root, tree, data } } } @@ -226,166 +156,17 @@ impl Outboard for PostOrderOutboard { let Some(offset) = self.tree.post_order_offset(node) else { return Ok(None); }; - let offset = offset.value() * 64 + 8; + let offset = offset.value() * 64; let mut content = [0u8; 64]; self.data.read_exact_at(offset, &mut content)?; Ok(Some(parse_hash_pair(content))) } } -impl PreOrderMemOutboard { - /// Load a pre-order outboard from a reader, root hash, and block size. - pub fn load( - root: blake3::Hash, - outboard_reader: impl ReadAt + Size, - block_size: BlockSize, - ) -> io::Result { - // validate roughly that the outboard is correct - let Some(size) = outboard_reader.size()? else { - io_error!("outboard must have a known size"); - }; - let Ok(size) = usize::try_from(size) else { - io_error!("outboard size must be less than usize::MAX"); - }; - let mut outboard = vec![0; size]; - outboard_reader.read_exact_at(0, &mut outboard)?; - if outboard.len() < 8 { - io_error!("outboard must be at least 8 bytes"); - }; - let prefix = &outboard[..8]; - let len = u64::from_le_bytes(prefix.try_into().unwrap()); - let expected_outboard_size = super::outboard_size(len, block_size); - let outboard_size = outboard.len() as u64; - if outboard_size != expected_outboard_size { - io_error!( - "outboard length does not match expected outboard length: {outboard_size} != {expected_outboard_size}" - ); - } - let tree = BaoTree::new(ByteNum(len), block_size); - outboard.splice(..8, []); - Ok(Self { - root, - tree, - data: outboard, - }) - } -} - -impl PostOrderMemOutboard { - /// Load a post-order outboard from a reader, root hash, and block size. - pub fn load( - root: blake3::Hash, - outboard_reader: impl ReadAt + Size, - block_size: BlockSize, - ) -> io::Result { - // validate roughly that the outboard is correct - let Some(size) = outboard_reader.size()? else { - io_error!("outboard must have a known size"); - }; - let Ok(size) = usize::try_from(size) else { - io_error!("outboard size must be less than usize::MAX"); - }; - let mut outboard = vec![0; size]; - outboard_reader.read_exact_at(0, &mut outboard)?; - if outboard.len() < 8 { - io_error!("outboard must be at least 8 bytes"); - }; - let suffix = &outboard[outboard.len() - 8..]; - let len = u64::from_le_bytes(suffix.try_into().unwrap()); - let expected_outboard_size = super::outboard_size(len, block_size); - let outboard_size = outboard.len() as u64; - if outboard_size != expected_outboard_size { - io_error!( - "outboard length does not match expected outboard length: {outboard_size} != {expected_outboard_size}" - ); - } - let tree = BaoTree::new(ByteNum(len), block_size); - outboard.truncate(outboard.len() - 8); - Ok(Self { - root, - tree, - data: outboard, - }) - } -} - -/// Given an outboard, return a range set of all valid ranges -pub fn valid_outboard_ranges(outboard: &O) -> io::Result -where - O: Outboard, -{ - struct RecursiveValidator<'a, O: Outboard> { - tree: BaoTree, - shifted_filled_size: TreeNode, - res: ChunkRanges, - outboard: &'a O, - } - - impl<'a, O: Outboard> RecursiveValidator<'a, O> { - fn validate_rec( - &mut self, - parent_hash: &blake3::Hash, - shifted: TreeNode, - is_root: bool, - ) -> io::Result<()> { - let node = shifted.subtract_block_size(self.tree.block_size.0); - let (l_hash, r_hash) = if let Some((l_hash, r_hash)) = self.outboard.load(node)? { - let actual = parent_cv(&l_hash, &r_hash, is_root); - if &actual != parent_hash { - // we got a validation error. Simply continue without adding the range - return Ok(()); - } - (l_hash, r_hash) - } else { - (*parent_hash, blake3::Hash::from([0; 32])) - }; - if shifted.is_leaf() { - let start = node.chunk_range().start; - let end = (start + self.tree.chunk_group_chunks() * 2).min(self.tree.chunks()); - self.res |= ChunkRanges::from(start..end); - } else { - // recurse - let left = shifted.left_child().unwrap(); - self.validate_rec(&l_hash, left, false)?; - let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); - self.validate_rec(&r_hash, right, false)?; - } - Ok(()) - } - } - let tree = outboard.tree(); - let root_hash = outboard.root(); - let (shifted_root, shifted_filled_size) = tree.shifted(); - let mut validator = RecursiveValidator { - tree, - shifted_filled_size, - res: ChunkRanges::empty(), - outboard, - }; - validator.validate_rec(&root_hash, shifted_root, true)?; - Ok(validator.res) -} - -// When this enum is used it is in the Header variant for the first 8 bytes, then stays in -// the Content state for the remainder. Since the Content is the largest part that this -// size inbalance is fine, hence allow clippy::large_enum_variant. -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum Position<'a> { - /// currently reading the header, so don't know how big the tree is - /// so we need to store the ranges and the chunk group log - Header { - ranges: &'a ChunkRangesRef, - block_size: BlockSize, - }, - /// currently reading the tree, all the info we need is in the iter - Content { iter: ResponseIterRef<'a> }, -} - /// Iterator that can be used to decode a response to a range request #[derive(Debug)] pub struct DecodeResponseIter<'a, R> { - inner: Position<'a>, + inner: ResponseIterRef<'a>, stack: SmallVec<[blake3::Hash; 10]>, encoded: R, buf: BytesMut, @@ -396,32 +177,28 @@ impl<'a, R: Read> DecodeResponseIter<'a, R> { /// /// For decoding you need to know the root hash, block size, and the ranges that were requested. /// Additionally you need to provide a reader that can be used to read the encoded data. - pub fn new( - root: blake3::Hash, - block_size: BlockSize, - encoded: R, - ranges: &'a ChunkRangesRef, - ) -> Self { - let buf = BytesMut::with_capacity(block_size.bytes()); - Self::new_with_buffer(root, block_size, encoded, ranges, buf) + pub fn new(root: blake3::Hash, tree: BaoTree, encoded: R, ranges: &'a ChunkRangesRef) -> Self { + let buf = BytesMut::with_capacity(tree.block_size().bytes()); + Self::new_with_buffer(root, tree, encoded, ranges, buf) } /// Create a new iterator to decode a response. /// /// This is the same as [Self::new], but allows you to provide a buffer to use for decoding. - /// The buffer will be resized as needed, but it's capacity should be the [BlockSize::bytes]. + /// The buffer will be resized as needed, but it's capacity should be the [crate::BlockSize::bytes]. pub fn new_with_buffer( root: blake3::Hash, - block_size: BlockSize, + tree: BaoTree, encoded: R, ranges: &'a ChunkRangesRef, buf: BytesMut, ) -> Self { + let ranges = truncate_ranges(ranges, tree.size()); let mut stack = SmallVec::new(); stack.push(root); Self { stack, - inner: Position::Header { ranges, block_size }, + inner: ResponseIterRef::new(tree, ranges), encoded, buf, } @@ -435,29 +212,12 @@ impl<'a, R: Read> DecodeResponseIter<'a, R> { /// Get a reference to the tree used for decoding. /// /// This is only available after the first chunk has been decoded. - pub fn tree(&self) -> Option { - match &self.inner { - Position::Content { iter } => Some(iter.tree()), - Position::Header { .. } => None, - } + pub fn tree(&self) -> BaoTree { + self.inner.tree() } - fn next0(&mut self) -> result::Result, AnyDecodeError> { - let inner = match &mut self.inner { - Position::Content { ref mut iter } => iter, - Position::Header { block_size, ranges } => { - let size = - read_len(&mut self.encoded).map_err(StartDecodeError::maybe_not_found)?; - let tree = BaoTree::new(size, *block_size); - // now we know the size, so we can canonicalize the ranges - let ranges = truncate_ranges(ranges, tree.size()); - self.inner = Position::Content { - iter: ResponseIterRef::new(tree, ranges), - }; - return Ok(Some(Header { size }.into())); - } - }; - match inner.next() { + fn next0(&mut self) -> result::Result, AnyDecodeError> { + match self.inner.next() { Some(BaoChunk::Parent { is_root, left, @@ -509,7 +269,7 @@ impl<'a, R: Read> DecodeResponseIter<'a, R> { } impl<'a, R: Read> Iterator for DecodeResponseIter<'a, R> { - type Item = result::Result; + type Item = result::Result; fn next(&mut self) -> Option { self.next0().transpose() @@ -649,90 +409,50 @@ pub fn encode_ranges_validated( /// /// If you do not want to update an outboard, use [super::outboard::EmptyOutboard] as /// the outboard. -pub fn decode_response_into( - root: blake3::Hash, - block_size: BlockSize, +pub fn decode_ranges( ranges: &ChunkRangesRef, encoded: R, - create: impl FnOnce(BaoTree, blake3::Hash) -> io::Result, mut target: W, -) -> io::Result> + mut outboard: O, +) -> io::Result<()> where - O: OutboardMut, + O: OutboardMut + Outboard, R: Read, W: WriteAt, { - let iter = DecodeResponseIter::new(root, block_size, encoded, ranges); - let mut outboard = None; - let mut tree = None; - let mut create = Some(create); + let iter = DecodeResponseIter::new(outboard.root(), outboard.tree(), encoded, ranges); for item in iter { match item? { - DecodeResponseItem::Header(Header { size }) => { - tree = Some(BaoTree::new(size, block_size)); - } - DecodeResponseItem::Parent(Parent { node, pair }) => { - let outboard = if let Some(outboard) = outboard.as_mut() { - outboard - } else { - let create = create.take().unwrap(); - outboard = Some(create(tree.take().unwrap(), root)?); - outboard.as_mut().unwrap() - }; + BaoContentItem::Parent(Parent { node, pair }) => { outboard.save(node, &pair)?; } - DecodeResponseItem::Leaf(Leaf { offset, data }) => { + BaoContentItem::Leaf(Leaf { offset, data }) => { target.write_all_at(offset.0, &data)?; } } } - Ok(outboard) -} - -/// Write ranges from memory to disk -/// -/// This is useful for writing changes to outboards. -/// Note that it is up to you to call flush. -pub fn write_ranges( - from: impl AsRef<[u8]>, - mut to: impl WriteAt, - ranges: &RangeSetRef, -) -> io::Result<()> { - let from = from.as_ref(); - let end = from.len() as u64; - for range in ranges.iter() { - let range = match range { - RangeSetRange::RangeFrom(x) => *x.start..end, - RangeSetRange::Range(x) => *x.start..*x.end, - }; - let start = usize::try_from(range.start).unwrap(); - let end = usize::try_from(range.end).unwrap(); - to.write_all_at(range.start, &from[start..end])?; - } Ok(()) } -/// Compute the post order outboard for the given data, writing into a io::Write -pub fn outboard_post_order( +/// Compute the outboard for the given data. +/// +/// Unlike [outboard_post_order], this will work with any outboard +/// implementation, but it is not guaranteed that writes are sequential. +pub fn outboard( data: impl Read, - size: u64, - block_size: BlockSize, - mut outboard: impl Write, + tree: BaoTree, + mut outboard: impl OutboardMut, ) -> io::Result { - let tree = BaoTree::new(ByteNum(size), block_size); - let mut buffer = vec![0; tree.chunk_group_bytes().to_usize()]; - let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer)?; - outboard.write_all(&size.to_le_bytes())?; + let mut buffer = vec![0u8; tree.chunk_group_bytes().to_usize()]; + let hash = outboard_impl(tree, data, &mut outboard, &mut buffer)?; Ok(hash) } -/// Compute the post order outboard for the given data -/// -/// This is the internal version that takes a start chunk and does not append the size! -pub(crate) fn outboard_post_order_impl( +/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size. +fn outboard_impl( tree: BaoTree, mut data: impl Read, - mut outboard: impl Write, + mut outboard: impl OutboardMut, buffer: &mut [u8], ) -> io::Result { // do not allocate for small trees @@ -740,11 +460,10 @@ pub(crate) fn outboard_post_order_impl( debug_assert!(buffer.len() == tree.chunk_group_bytes().to_usize()); for item in tree.post_order_chunks_iter() { match item { - BaoChunk::Parent { is_root, .. } => { + BaoChunk::Parent { is_root, node, .. } => { let right_hash = stack.pop().unwrap(); let left_hash = stack.pop().unwrap(); - outboard.write_all(left_hash.as_bytes())?; - outboard.write_all(right_hash.as_bytes())?; + outboard.save(node, &(left_hash, right_hash))?; let parent = parent_cv(&left_hash, &right_hash, is_root); stack.push(parent); } @@ -766,28 +485,39 @@ pub(crate) fn outboard_post_order_impl( Ok(hash) } -/// Fill a mutable outboard from the given in memory data -pub(crate) fn write_outboard_from_mem( - data: &[u8], - mut outboard: O, +/// Compute the post order outboard for the given data, writing into a io::Write +/// +/// For the post order outboard, writes to the target are sequential. +/// +/// This will not add the size to the output. You need to store it somewhere else +/// or append it yourself. +pub fn outboard_post_order( + data: impl Read, + tree: BaoTree, + mut outboard: impl Write, +) -> io::Result { + let mut buffer = vec![0u8; tree.chunk_group_bytes().to_usize()]; + let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer)?; + Ok(hash) +} + +/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size. +fn outboard_post_order_impl( + tree: BaoTree, + mut data: impl Read, + mut outboard: impl Write, + buffer: &mut [u8], ) -> io::Result { - let tree = outboard.tree(); - if tree.size != ByteNum(data.len() as u64) { - io_error!( - "data size does not match outboard size: {} != {}", - data.len(), - tree.size - ); - } // do not allocate for small trees let mut stack = SmallVec::<[blake3::Hash; 10]>::new(); + debug_assert!(buffer.len() == tree.chunk_group_bytes().to_usize()); for item in tree.post_order_chunks_iter() { match item { - BaoChunk::Parent { is_root, node, .. } => { + BaoChunk::Parent { is_root, .. } => { let right_hash = stack.pop().unwrap(); let left_hash = stack.pop().unwrap(); - let pair = (left_hash, right_hash); - outboard.save(node, &pair)?; + outboard.write_all(left_hash.as_bytes())?; + outboard.write_all(right_hash.as_bytes())?; let parent = parent_cv(&left_hash, &right_hash, is_root); stack.push(parent); } @@ -797,9 +527,8 @@ pub(crate) fn write_outboard_from_mem( start_chunk, .. } => { - let start = start_chunk.to_bytes().to_usize(); - let end = start + size; - let buf = &data[start..end]; + let buf = &mut buffer[..size]; + data.read_exact(buf)?; let hash = hash_subtree(start_chunk.0, buf, is_root); stack.push(hash); } @@ -810,13 +539,6 @@ pub(crate) fn write_outboard_from_mem( Ok(hash) } -fn read_len(mut from: impl Read) -> std::io::Result { - let mut buf = [0; 8]; - from.read_exact(&mut buf)?; - let len = ByteNum(u64::from_le_bytes(buf)); - Ok(len) -} - fn read_parent(mut from: impl Read) -> std::io::Result<(blake3::Hash, blake3::Hash)> { let mut buf = [0; 64]; from.read_exact(&mut buf)?; @@ -825,83 +547,267 @@ fn read_parent(mut from: impl Read) -> std::io::Result<(blake3::Hash, blake3::Ha Ok((l_hash, r_hash)) } -/// seeks read the bytes for the range from the source -fn read_range(from: impl ReadAt, range: Range, buf: &mut [u8]) -> std::io::Result<&[u8]> { - let len = (range.end - range.start).to_usize(); - let buf = &mut buf[..len]; - from.read_exact_at(range.start.0, buf)?; - Ok(buf) +/// Copy an outboard to another outboard. +/// +/// This can be used to persist an in memory outboard or to change from +/// pre-order to post-order. +pub fn copy(from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> { + let tree = from.tree(); + for node in tree.pre_order_nodes_iter() { + if let Some(hash_pair) = from.load(node)? { + to.save(node, &hash_pair)?; + } + } + Ok(()) } -/// Given an outboard and a file, return all valid ranges -pub fn valid_file_ranges(outboard: &O, reader: R) -> io::Result -where - O: Outboard, - R: ReadAt, -{ - struct RecursiveValidator<'a, O: Outboard, R: ReadAt> { +#[cfg(feature = "validate")] +mod validate { + use std::{io, ops::Range}; + + use genawaiter::sync::{Co, Gen}; + use positioned_io::ReadAt; + + use crate::{ + blake3, hash_subtree, io::LocalBoxFuture, rec::truncate_ranges, split, BaoTree, ByteNum, + ChunkNum, ChunkRangesRef, TreeNode, + }; + + use super::Outboard; + + /// Given a data file and an outboard, compute all valid ranges. + /// + /// This is not cheap since it recomputes the hashes for all chunks. + /// + /// To reduce the amount of work, you can specify a range you are interested in. + pub fn valid_ranges<'a, O, D>( + outboard: O, + data: D, + ranges: &'a ChunkRangesRef, + ) -> impl IntoIterator>> + 'a + where + O: Outboard + 'a, + D: ReadAt + 'a, + { + Gen::new(move |co| async move { + if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await + { + co.yield_(Err(cause)).await; + } + }) + } + + struct RecursiveDataValidator<'a, O: Outboard, D: ReadAt> { tree: BaoTree, - valid_nodes: TreeNode, - res: ChunkRanges, - outboard: &'a O, - reader: R, + shifted_filled_size: TreeNode, + outboard: O, + data: D, buffer: Vec, + co: &'a Co>>, } - impl<'a, O: Outboard, R: ReadAt> RecursiveValidator<'a, O, R> { - fn validate_rec( + impl<'a, O: Outboard, D: ReadAt> RecursiveDataValidator<'a, O, D> { + async fn validate( + outboard: O, + data: D, + ranges: &ChunkRangesRef, + co: &Co>>, + ) -> io::Result<()> { + let tree = outboard.tree(); + let mut buffer = vec![0u8; tree.chunk_group_bytes().to_usize()]; + if tree.blocks().0 == 1 { + // special case for a tree that fits in one block / chunk group + let tmp = &mut buffer[..tree.size().to_usize()]; + data.read_exact_at(0, tmp)?; + let actual = hash_subtree(0, tmp, true); + if actual == outboard.root() { + co.yield_(Ok(ChunkNum(0)..tree.chunks())).await; + } + return Ok(()); + } + let ranges = truncate_ranges(ranges, tree.size()); + let root_hash = outboard.root(); + let (shifted_root, shifted_filled_size) = tree.shifted(); + let mut validator = RecursiveDataValidator { + tree, + shifted_filled_size, + outboard, + data, + buffer, + co, + }; + validator + .validate_rec(&root_hash, shifted_root, true, ranges) + .await + } + + async fn yield_if_valid( &mut self, - parent_hash: &blake3::Hash, - node: TreeNode, + range: Range, + hash: &blake3::Hash, is_root: bool, ) -> io::Result<()> { - if let Some((l_hash, r_hash)) = self.outboard.load(node)? { - let actual = parent_cv(&l_hash, &r_hash, is_root); - if &actual != parent_hash { - // we got a validation error. Simply continue without adding the range + let len = (range.end - range.start).to_usize(); + let tmp = &mut self.buffer[..len]; + self.data.read_exact_at(range.start.0, tmp)?; + // is_root is always false because the case of a single chunk group is handled before calling this function + let actual = hash_subtree(range.start.full_chunks().0, tmp, is_root); + if &actual == hash { + // yield the left range + self.co + .yield_(Ok(range.start.full_chunks()..range.end.chunks())) + .await; + } + io::Result::Ok(()) + } + + fn validate_rec<'b>( + &'b mut self, + parent_hash: &'b blake3::Hash, + shifted: TreeNode, + is_root: bool, + ranges: &'b ChunkRangesRef, + ) -> LocalBoxFuture<'b, io::Result<()>> { + Box::pin(async move { + if ranges.is_empty() { + // this part of the tree is not of interest, so we can skip it return Ok(()); } - if node.is_leaf() { - let (s, m, e) = self.tree.leaf_byte_ranges3(node); - let l_data = read_range(&mut self.reader, s..m, &mut self.buffer)?; - let actual = hash_subtree(s.chunks().0, l_data, false); - if actual == l_hash { - self.res |= ChunkRanges::from(s.chunks()..m.chunks()); + let node = shifted.subtract_block_size(self.tree.block_size.0); + let (l, m, r) = self.tree.leaf_byte_ranges3(node); + if !self.tree.is_relevant_for_outboard(node) { + self.yield_if_valid(l..r, parent_hash, is_root).await?; + return Ok(()); + } + let Some((l_hash, r_hash)) = self.outboard.load(node)? else { + // outboard is incomplete, we can't validate + return Ok(()); + }; + let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root); + if &actual != parent_hash { + // hash mismatch, we can't validate + return Ok(()); + }; + let (l_ranges, r_ranges) = split(ranges, node); + if shifted.is_leaf() { + if !l_ranges.is_empty() { + self.yield_if_valid(l..m, &l_hash, false).await?; } - - let r_data = read_range(&mut self.reader, m..e, &mut self.buffer)?; - let actual = hash_subtree(m.chunks().0, r_data, false); - if actual == r_hash { - self.res |= ChunkRanges::from(m.chunks()..e.chunks()); + if !r_ranges.is_empty() { + self.yield_if_valid(m..r, &r_hash, false).await?; } } else { - // recurse - let left = node.left_child().unwrap(); - self.validate_rec(&l_hash, left, false)?; - let right = node.right_descendant(self.valid_nodes).unwrap(); - self.validate_rec(&r_hash, right, false)?; - } - } else if node.is_leaf() { - let (s, m, _) = self.tree.leaf_byte_ranges3(node); - let l_data = read_range(&mut self.reader, s..m, &mut self.buffer)?; - let actual = hash_subtree(s.chunks().0, l_data, is_root); - if actual == *parent_hash { - self.res |= ChunkRanges::from(s.chunks()..m.chunks()); + // recurse (we are in the domain of the shifted tree) + let left = shifted.left_child().unwrap(); + self.validate_rec(&l_hash, left, false, l_ranges).await?; + let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); + self.validate_rec(&r_hash, right, false, r_ranges).await?; } + Ok(()) + }) + } + } + + /// Given just an outboard, compute all valid ranges. + /// + /// This is not cheap since it recomputes the hashes for all chunks. + pub fn valid_outboard_ranges<'a, O>( + outboard: O, + ranges: &'a ChunkRangesRef, + ) -> impl IntoIterator>> + 'a + where + O: Outboard + 'a, + { + Gen::new(move |co| async move { + if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await { + co.yield_(Err(cause)).await; + } + }) + } + + struct RecursiveOutboardValidator<'a, O: Outboard> { + tree: BaoTree, + shifted_filled_size: TreeNode, + outboard: O, + co: &'a Co>>, + } + + impl<'a, O: Outboard> RecursiveOutboardValidator<'a, O> { + async fn validate( + outboard: O, + ranges: &ChunkRangesRef, + co: &Co>>, + ) -> io::Result<()> { + let tree = outboard.tree(); + if tree.blocks().0 == 1 { + // special case for a tree that fits in one block / chunk group + co.yield_(Ok(ChunkNum(0)..tree.chunks())).await; + return Ok(()); + } + let ranges = truncate_ranges(ranges, tree.size()); + let root_hash = outboard.root(); + let (shifted_root, shifted_filled_size) = tree.shifted(); + let mut validator = RecursiveOutboardValidator { + tree, + shifted_filled_size, + outboard, + co, }; - Ok(()) + validator + .validate_rec(&root_hash, shifted_root, true, ranges) + .await + } + + fn validate_rec<'b>( + &'b mut self, + parent_hash: &'b blake3::Hash, + shifted: TreeNode, + is_root: bool, + ranges: &'b ChunkRangesRef, + ) -> LocalBoxFuture<'b, io::Result<()>> { + Box::pin(async move { + let yield_node_range = |range: Range| { + self.co + .yield_(Ok(range.start.full_chunks()..range.end.chunks())) + }; + if ranges.is_empty() { + // this part of the tree is not of interest, so we can skip it + return Ok(()); + } + let node = shifted.subtract_block_size(self.tree.block_size.0); + let (l, m, r) = self.tree.leaf_byte_ranges3(node); + if !self.tree.is_relevant_for_outboard(node) { + yield_node_range(l..r).await; + return Ok(()); + } + let Some((l_hash, r_hash)) = self.outboard.load(node)? else { + // outboard is incomplete, we can't validate + return Ok(()); + }; + let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root); + if &actual != parent_hash { + // hash mismatch, we can't validate + return Ok(()); + }; + let (l_ranges, r_ranges) = split(ranges, node); + if shifted.is_leaf() { + if !l_ranges.is_empty() { + yield_node_range(l..m).await; + } + if !r_ranges.is_empty() { + yield_node_range(m..r).await; + } + } else { + // recurse (we are in the domain of the shifted tree) + let left = shifted.left_child().unwrap(); + self.validate_rec(&l_hash, left, false, l_ranges).await?; + let right = shifted.right_descendant(self.shifted_filled_size).unwrap(); + self.validate_rec(&r_hash, right, false, r_ranges).await?; + } + Ok(()) + }) } } - let tree = outboard.tree(); - let root_hash = outboard.root(); - let mut validator = RecursiveValidator { - tree, - valid_nodes: tree.filled_size(), - res: ChunkRanges::empty(), - outboard, - reader, - buffer: vec![0; tree.block_size.bytes()], - }; - validator.validate_rec(&root_hash, tree.root(), true)?; - Ok(validator.res) } +#[cfg(feature = "validate")] +pub use validate::{valid_outboard_ranges, valid_ranges}; diff --git a/src/lib.rs b/src/lib.rs index 5bda3e8..39758b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,12 @@ //! [TreeNode] provides various helpers to e.g. get the offset of a node in //! different traversal orders. //! -//! There are various newtypes for the different kinds of integers used in the -//! tree, e.g. [ByteNum] for number of bytes, [ChunkNum] for number of chunks. +//! There are newtypes for the different kinds of integers used in the +//! tree: +//! [ByteNum] is an u64 number of bytes, +//! [ChunkNum] is an u64 number of chunks, +//! [TreeNode] is an u64 tree node identifier, +//! and [BlockSize] is the log base 2 of the chunk group size. //! //! All this is then used in the [io] module to implement the actual io, both //! synchronous and asynchronous. @@ -42,7 +46,7 @@ pub type ChunkRanges = range_collections::RangeSet2; /// A referenceable set of chunk ranges /// -/// [ChunkRanges] implements [AsRef]. +/// [ChunkRanges] implements [`AsRef`]. pub type ChunkRangesRef = range_collections::RangeSetRef; fn hash_subtree(start_chunk: u64, data: &[u8], is_root: bool) -> blake3::Hash { @@ -204,7 +208,7 @@ impl BaoTree { /// Traverse the part of the tree that is relevant for a ranges querys /// in pre order as [NodeInfo]s /// - /// This is mostly used internally by the [PreOrderChunkIterRef] + /// This is mostly used internally. /// /// When `min_level` is set to a value greater than 0, the iterator will /// skip all branch nodes that are at a level < min_level if they are fully @@ -249,11 +253,14 @@ impl BaoTree { self.blocks().0 - 1 } - pub(crate) fn outboard_size(size: ByteNum, block_size: BlockSize) -> ByteNum { - let tree = Self::new(size, block_size); - ByteNum(tree.outboard_hash_pairs() * 64 + 8) + /// The outboard size for this tree. + /// + /// This is the outboard size *without* the size prefix. + pub fn outboard_size(&self) -> ByteNum { + ByteNum(self.outboard_hash_pairs() * 64) } + #[allow(dead_code)] fn filled_size(&self) -> TreeNode { let blocks = self.chunks(); let n = (blocks.0 + 1) / 2; diff --git a/src/tests.rs b/src/tests.rs index ae9bd37..7039765 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -10,12 +10,7 @@ use range_collections::RangeSet2; use crate::{ assert_tuple_eq, blake3, - io::{ - full_chunk_groups, - outboard::PreOrderMemOutboard, - sync::{DecodeResponseItem, Outboard}, - Leaf, - }, + io::{full_chunk_groups, outboard::PreOrderMemOutboard, sync::Outboard, BaoContentItem, Leaf}, iter::{PostOrderChunkIter, PreOrderPartialIterRef, ResponseIterRef}, prop_assert_tuple_eq, rec::{ @@ -34,6 +29,13 @@ use super::{ BaoTree, BlockSize, TreeNode, }; +fn read_len(mut from: impl std::io::Read) -> std::io::Result { + let mut buf = [0; 8]; + from.read_exact(&mut buf)?; + let len = ByteNum(u64::from_le_bytes(buf)); + Ok(len) +} + /// Compute the blake3 hash for the given data, /// /// using blake3_hash_inner which is used in hash_block. @@ -47,25 +49,6 @@ fn bao_tree_blake3_impl(data: Vec) -> (blake3::Hash, blake3::Hash) { (expected, actual) } -/// Computes a reference post order outboard using the abao crate (chunk_group_log = 0) and the non-standard finalize_post_order function. -fn post_order_outboard_reference_2(data: &[u8]) -> PostOrderMemOutboard { - let mut outboard = Vec::new(); - let cursor = std::io::Cursor::new(&mut outboard); - let mut encoder = abao::encode::Encoder::new_outboard(cursor); - encoder.write_all(data).unwrap(); - // requires non standard fn finalize_post_order - let hash = encoder.finalize_post_order().unwrap(); - // remove the length suffix - outboard.truncate(outboard.len() - 8); - let hash = blake3::Hash::from(*hash.as_bytes()); - PostOrderMemOutboard::new( - hash, - BaoTree::new(ByteNum(data.len() as u64), BlockSize::ZERO), - outboard, - ) - .unwrap() -} - /// Computes a reference pre order outboard using the bao crate (chunk_group_log = 0) and then flips it to a post-order outboard. fn post_order_outboard_reference(data: &[u8]) -> PostOrderMemOutboard { let mut outboard = Vec::new(); @@ -77,14 +60,14 @@ fn post_order_outboard_reference(data: &[u8]) -> PostOrderMemOutboard { let tree = BaoTree::new(ByteNum(data.len() as u64), BlockSize::ZERO); outboard.splice(..8, []); let pre = PreOrderMemOutboard::new(hash, tree, outboard); - pre.unwrap().flip() + pre.flip() } fn encode_slice_reference(data: &[u8], chunk_range: Range) -> (Vec, blake3::Hash) { - let (outboard, hash) = abao::encode::outboard(data); + let (outboard, hash) = bao::encode::outboard(data); let slice_start = chunk_range.start.to_bytes().0; let slice_len = (chunk_range.end - chunk_range.start).to_bytes().0; - let mut encoder = abao::encode::SliceExtractor::new_outboard( + let mut encoder = bao::encode::SliceExtractor::new_outboard( Cursor::new(&data), Cursor::new(&outboard), slice_start, @@ -103,8 +86,6 @@ fn bao_tree_encode_slice_comparison_impl(data: Vec, mut range: Range, mut range: Range, range: Range) { let expected = data; let ranges = ChunkRanges::from(range); let mut ec = Cursor::new(encoded); - for item in decode_ranges_into_chunks(root, BlockSize::ZERO, &mut ec, &ranges) { + for item in decode_ranges_into_chunks(root, BlockSize::ZERO, &mut ec, &ranges).unwrap() { let (pos, slice) = item.unwrap(); let pos = pos.to_usize(); assert_eq!(expected[pos..pos + slice.len()], *slice); @@ -230,7 +210,7 @@ fn bao_tree_outboard_levels() { assert_eq!(expected, hash); assert_eq!( ByteNum(outboard.len() as u64), - BaoTree::outboard_size(ByteNum(td.len() as u64), block_size) + BaoTree::new(ByteNum(td.len() as u64), block_size).outboard_size() + 8 ); } } @@ -247,7 +227,9 @@ fn bao_tree_slice_roundtrip_test(data: Vec, mut range: Range, bloc let expected = data.clone(); let mut all_ranges: range_collections::RangeSet<[ByteNum; 2]> = RangeSet2::empty(); let mut ec = Cursor::new(encoded); - for item in decode_ranges_into_chunks(root, block_size, &mut ec, &ChunkRanges::from(range)) { + for item in + decode_ranges_into_chunks(root, block_size, &mut ec, &ChunkRanges::from(range)).unwrap() + { let (pos, slice) = item.unwrap(); // compute all data ranges all_ranges |= RangeSet2::from(pos..pos + (slice.len() as u64)); @@ -521,20 +503,22 @@ fn test_pre_order_outboard_fast() { pub fn decode_ranges_into_chunks<'a>( root: blake3::Hash, block_size: BlockSize, - encoded: impl Read + 'a, + mut encoded: impl Read + 'a, ranges: &'a ChunkRangesRef, -) -> impl Iterator)>> + 'a { - let iter = DecodeResponseIter::new(root, block_size, encoded, ranges); - iter.filter_map(|item| match item { +) -> std::io::Result)>> + 'a> { + let size = read_len(&mut encoded)?; + let tree = BaoTree::new(size, block_size); + let iter = DecodeResponseIter::new(root, tree, encoded, ranges); + Ok(iter.filter_map(|item| match item { Ok(item) => { - if let DecodeResponseItem::Leaf(Leaf { offset, data }) = item { + if let BaoContentItem::Leaf(Leaf { offset, data }) = item { Some(Ok((offset, data.to_vec()))) } else { None } } Err(e) => Some(Err(e.into())), - }) + })) } /// iterate over all nodes in the tree in depth first, left to right, pre order @@ -1028,10 +1012,8 @@ proptest! { #[test] fn flip(len in 0usize..100000) { let data = make_test_data(len); - let post1 = post_order_outboard_reference(&data); - let post2 = post_order_outboard_reference_2(&data); - prop_assert_eq!(&post1, &post2); - prop_assert_eq!(&post1, &post1.flip().flip()); + let post = post_order_outboard_reference(&data); + prop_assert_eq!(&post, &post.flip().flip()); } diff --git a/src/tests2.rs b/src/tests2.rs index 66e92ee..84124ef 100644 --- a/src/tests2.rs +++ b/src/tests2.rs @@ -12,8 +12,10 @@ use range_collections::{RangeSet2, RangeSetRef}; use smallvec::SmallVec; use std::ops::Range; use test_strategy::proptest; +use tokio::io::AsyncReadExt; use crate::io::outboard::PreOrderMemOutboard; +use crate::io::BaoContentItem; use crate::rec::{ get_leaf_ranges, make_test_data, partial_chunk_iter_reference, range_union, response_iter_reference, truncate_ranges, ReferencePreOrderPartialChunkIterRef, @@ -22,16 +24,21 @@ use crate::{assert_tuple_eq, prop_assert_tuple_eq, ChunkRanges, ChunkRangesRef}; use crate::{ blake3, hash_subtree, io::{ - fsm::{BaoContentItem, ResponseDecoderReadingNext}, - outboard::PostOrderMemOutboard, - sync::{DecodeResponseItem, Outboard}, - Header, Leaf, Parent, + fsm::ResponseDecoderReadingNext, outboard::PostOrderMemOutboard, sync::Outboard, Leaf, + Parent, }, iter::{BaoChunk, PreOrderPartialChunkIterRef, ResponseIterRef}, rec::{encode_selected_rec, select_nodes_rec}, BaoTree, BlockSize, ByteNum, ChunkNum, TreeNode, }; +fn read_len(mut from: impl std::io::Read) -> std::io::Result { + let mut buf = [0; 8]; + from.read_exact(&mut buf)?; + let len = ByteNum(u64::from_le_bytes(buf)); + Ok(len) +} + #[cfg(feature = "validate")] use futures_lite::StreamExt; @@ -238,17 +245,24 @@ fn mem_outboard_flip_proptest(#[strategy(tree())] tree: BaoTree) { mem_outboard_flip_impl(tree); } -/// range is a range of chunks. Just using u64 for convenience in tests -fn valid_ranges_sync(outboard: &PostOrderMemOutboard) -> ChunkRanges { - crate::io::sync::valid_outboard_ranges(outboard).unwrap() -} - -/// range is a range of chunks. Just using u64 for convenience in tests #[cfg(feature = "validate")] -fn valid_ranges_fsm(outboard: impl crate::io::fsm::Outboard, data: Bytes) -> ChunkRanges { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { +mod validate { + use super::*; + + /// range is a range of chunks. Just using u64 for convenience in tests + fn valid_outboard_ranges_sync(outboard: impl crate::io::sync::Outboard) -> ChunkRanges { + let ranges = ChunkRanges::all(); + let iter = crate::io::sync::valid_outboard_ranges(outboard, &ranges); + let mut res = ChunkRanges::empty(); + for item in iter { + res |= ChunkRanges::from(item.unwrap()); + } + res + } + + /// range is a range of chunks. Just using u64 for convenience in tests + fn valid_ranges_fsm(outboard: impl crate::io::fsm::Outboard, data: Bytes) -> ChunkRanges { + run_blocking(async move { let ranges = ChunkRanges::all(); let mut stream = crate::io::fsm::valid_ranges(outboard, data, &ranges); let mut res = ChunkRanges::empty(); @@ -259,14 +273,23 @@ fn valid_ranges_fsm(outboard: impl crate::io::fsm::Outboard, data: Bytes) -> Chu std::io::Result::Ok(res) }) .unwrap() -} + } -/// range is a range of chunks. Just using u64 for convenience in tests -#[cfg(feature = "validate")] -fn valid_outboard_ranges_fsm(outboard: &mut PostOrderMemOutboard) -> ChunkRanges { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { + /// range is a range of chunks. Just using u64 for convenience in tests + fn valid_ranges_sync(outboard: impl crate::io::sync::Outboard, data: &[u8]) -> ChunkRanges { + let ranges = ChunkRanges::all(); + let iter = crate::io::sync::valid_ranges(outboard, data, &ranges); + let mut res = ChunkRanges::empty(); + for item in iter { + let item = item.unwrap(); + res |= ChunkRanges::from(item); + } + res + } + + /// range is a range of chunks. Just using u64 for convenience in tests + fn valid_outboard_ranges_fsm(outboard: &mut PostOrderMemOutboard) -> ChunkRanges { + run_blocking(async move { let ranges = ChunkRanges::all(); let mut stream = crate::io::fsm::valid_outboard_ranges(outboard, &ranges); let mut res = ChunkRanges::empty(); @@ -277,227 +300,178 @@ fn valid_outboard_ranges_fsm(outboard: &mut PostOrderMemOutboard) -> ChunkRanges std::io::Result::Ok(res) }) .unwrap() -} - -#[cfg(feature = "validate")] -fn validate_outboard_sync_pos_impl(tree: BaoTree) { - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let outboard = PostOrderMemOutboard::create(data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - let actual = valid_ranges_sync(&outboard); - assert_eq!(expected, actual) -} - -#[cfg(feature = "validate")] -async fn valid_file_ranges_test_impl() { - // interesting cases: - // below 16 chunks - // exactly 16 chunks - // 16 chunks + 1 - // 32 chunks - // 32 chunks + 1 < seems to fail! - let data = make_test_data(1024 * 16 * 2 + 1024 * 15); - let outboard = PostOrderMemOutboard::create(&data, BlockSize(4)); - let ranges = ChunkRanges::from(ChunkNum(0)..ChunkNum(120)); - // data[32768] = 0; - let data = Bytes::from(data); - let mut stream = crate::io::fsm::valid_ranges(outboard, data, &ranges); - while let Some(item) = stream.next().await { - let item = item.unwrap(); - println!("{:?}", item); } -} -/// range is a range of chunks. Just using u64 for convenience in tests -#[cfg(feature = "validate")] -#[test] -fn valid_file_ranges_fsm() { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(valid_file_ranges_test_impl()) -} - -#[cfg(feature = "validate")] -#[proptest] -fn validate_outboard_sync_pos_proptest(#[strategy(tree())] tree: BaoTree) { - validate_outboard_sync_pos_impl(tree); -} + fn validate_outboard_pos_impl(tree: BaoTree) { + let size = tree.size.to_usize(); + let block_size = tree.block_size; + let data = make_test_data(size); + let mut outboard = PostOrderMemOutboard::create(data, block_size); + let expected = ChunkRanges::from(..outboard.tree().chunks()); + let actual = valid_outboard_ranges_sync(&mut outboard); + assert_eq!(expected, actual); + let actual = valid_outboard_ranges_fsm(&mut outboard); + assert_eq!(expected, actual) + } -#[cfg(feature = "validate")] -fn validate_outboard_fsm_pos_impl(tree: BaoTree) { - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let mut outboard = PostOrderMemOutboard::create(data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - let actual = valid_outboard_ranges_fsm(&mut outboard); - assert_eq!(expected, actual) -} + #[proptest] + fn validate_outboard_pos_proptest(#[strategy(tree())] tree: BaoTree) { + validate_outboard_pos_impl(tree); + } -#[cfg(feature = "validate")] -#[proptest] -fn validate_outboard_fsm_pos_proptest(#[strategy(tree())] tree: BaoTree) { - validate_outboard_fsm_pos_impl(tree); -} + #[test] + fn validate_outboard_pos_cases() { + let cases = [(0x10001, 0)]; + for (size, block_level) in cases { + let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + validate_outboard_pos_impl(tree); + } + } -#[cfg(feature = "validate")] -#[test] -fn validate_outboard_fsm_pos_cases() { - let cases = [(0x10001, 0)]; - for (size, block_level) in cases { - let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); - validate_outboard_fsm_pos_impl(tree); + fn validate_pos_impl(tree: BaoTree) { + let size = tree.size.to_usize(); + let block_size = tree.block_size; + let data = make_test_data(size); + let mut outboard = PostOrderMemOutboard::create(&data, block_size); + let expected = ChunkRanges::from(..outboard.tree().chunks()); + let actual = valid_ranges_sync(&outboard, &data); + assert_eq!(expected, actual); + let actual = valid_ranges_fsm(&mut outboard, data.into()); + assert_eq!(expected, actual); } -} -#[cfg(feature = "validate")] -fn validate_fsm_pos_impl(tree: BaoTree) { - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let mut outboard = PostOrderMemOutboard::create(&data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - let actual = valid_ranges_fsm(&mut outboard, data.into()); - assert_eq!(expected, actual) -} + #[proptest] + fn validate_pos_proptest(#[strategy(tree())] tree: BaoTree) { + validate_pos_impl(tree); + } -#[cfg(feature = "validate")] -#[proptest] -fn validate_fsm_pos_proptest(#[strategy(tree())] tree: BaoTree) { - validate_fsm_pos_impl(tree); -} + #[test] + fn validate_pos_cases() { + let cases = [ + // (0x10001, 0), + (0x401, 0), + ]; + for (size, block_level) in cases { + let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + validate_pos_impl(tree); + } + } -#[cfg(feature = "validate")] -#[test] -fn validate_fsm_pos_cases() { - let cases = [ - // (0x10001, 0), - (0x401, 0), - ]; - for (size, block_level) in cases { - let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); - validate_fsm_pos_impl(tree); + fn flip_bit(data: &mut [u8], rand: usize) { + // flip a random bit in the outboard + // this is the post order outboard without the length suffix, + // so it's all hashes + let bit = rand % data.len() * 8; + let byte = bit / 8; + let bit = bit % 8; + data[byte] ^= 1 << bit; } -} -fn flip_bit(data: &mut [u8], rand: usize) { - // flip a random bit in the outboard - // this is the post order outboard without the length suffix, - // so it's all hashes - let bit = rand % data.len() * 8; - let byte = bit / 8; - let bit = bit % 8; - data[byte] ^= 1 << bit; -} + /// Check that flipping a random bit in the outboard makes at least one range invalid + fn validate_outboard_sync_neg_impl(tree: BaoTree, rand: u32) { + let rand = rand as usize; + let size = tree.size.to_usize(); + let block_size = tree.block_size; + let data = make_test_data(size); + let mut outboard = PostOrderMemOutboard::create(data, block_size); + let expected = ChunkRanges::from(..outboard.tree().chunks()); + if !outboard.data.is_empty() { + // flip a random bit in the outboard + flip_bit(&mut outboard.data, rand); + // Check that at least one range is invalid + let actual = valid_outboard_ranges_sync(&outboard); + assert_ne!(expected, actual); + } + } -/// Check that flipping a random bit in the outboard makes at least one range invalid -fn validate_outboard_sync_neg_impl(tree: BaoTree, rand: u32) { - let rand = rand as usize; - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let mut outboard = PostOrderMemOutboard::create(data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - if !outboard.data.is_empty() { - // flip a random bit in the outboard - flip_bit(&mut outboard.data, rand); - // Check that at least one range is invalid - let actual = valid_ranges_sync(&outboard); - assert_ne!(expected, actual); + #[test] + fn validate_outboard_sync_neg_cases() { + let cases = [((0x6001, 3), 1265277760)]; + for ((size, block_level), rand) in cases { + let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + validate_outboard_sync_neg_impl(tree, rand); + } } -} -#[test] -fn validate_outboard_sync_neg_cases() { - let cases = [((0x6001, 3), 1265277760)]; - for ((size, block_level), rand) in cases { - let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + #[proptest] + fn validate_outboard_sync_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { validate_outboard_sync_neg_impl(tree, rand); } -} -#[proptest] -fn validate_outboard_sync_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { - validate_outboard_sync_neg_impl(tree, rand); -} - -/// Check that flipping a random bit in the outboard makes at least one range invalid -#[cfg(feature = "validate")] -fn validate_outboard_fsm_neg_impl(tree: BaoTree, rand: u32) { - let rand = rand as usize; - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let mut outboard = PostOrderMemOutboard::create(data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - if !outboard.data.is_empty() { - // flip a random bit in the outboard - flip_bit(&mut outboard.data, rand); - // Check that at least one range is invalid - let actual = valid_outboard_ranges_fsm(&mut outboard); - assert_ne!(expected, actual); + /// Check that flipping a random bit in the outboard makes at least one range invalid + fn validate_outboard_neg_impl(tree: BaoTree, rand: u32) { + let rand = rand as usize; + let size = tree.size.to_usize(); + let block_size = tree.block_size; + let data = make_test_data(size); + let mut outboard = PostOrderMemOutboard::create(data, block_size); + let expected = ChunkRanges::from(..outboard.tree().chunks()); + if !outboard.data.is_empty() { + // flip a random bit in the outboard + flip_bit(&mut outboard.data, rand); + // Check that at least one range is invalid + let actual = valid_outboard_ranges_sync(&mut outboard); + assert_ne!(expected, actual); + let actual = valid_outboard_ranges_fsm(&mut outboard); + assert_ne!(expected, actual); + } } -} -#[cfg(feature = "validate")] -#[proptest] -fn validate_outboard_fsm_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { - validate_outboard_fsm_neg_impl(tree, rand); -} + #[proptest] + fn validate_outboard_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { + validate_outboard_neg_impl(tree, rand); + } -#[cfg(feature = "validate")] -#[test] -fn validate_outboard_fsm_neg_cases() { - let cases = [((0x2001, 0), 2738363904)]; - for ((size, block_level), rand) in cases { - let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); - validate_outboard_fsm_neg_impl(tree, rand); + #[test] + fn validate_outboard_neg_cases() { + let cases = [((0x2001, 0), 2738363904)]; + for ((size, block_level), rand) in cases { + let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + validate_outboard_neg_impl(tree, rand); + } } -} -/// Check that flipping a random bit in the outboard makes at least one range invalid -#[cfg(feature = "validate")] -fn validate_fsm_neg_impl(tree: BaoTree, rand: u32) { - let rand = rand as usize; - let size = tree.size.to_usize(); - let block_size = tree.block_size; - let data = make_test_data(size); - let mut outboard = PostOrderMemOutboard::create(&data, block_size); - let expected = ChunkRanges::from(..outboard.tree().chunks()); - if !outboard.data.is_empty() { - // flip a random bit in the outboard - flip_bit(&mut outboard.data, rand); - // Check that at least one range is invalid - let actual = valid_ranges_fsm(&mut outboard, data.into()); - assert_ne!(expected, actual); + /// Check that flipping a random bit in the outboard makes at least one range invalid + fn validate_neg_impl(tree: BaoTree, rand: u32) { + let rand = rand as usize; + let size = tree.size.to_usize(); + let block_size = tree.block_size; + let data = make_test_data(size); + let mut outboard = PostOrderMemOutboard::create(&data, block_size); + let expected = ChunkRanges::from(..outboard.tree().chunks()); + if !outboard.data.is_empty() { + // flip a random bit in the outboard + flip_bit(&mut outboard.data, rand); + // Check that at least one range is invalid + let actual = valid_ranges_sync(&mut outboard, &data); + assert_ne!(expected, actual); + let actual = valid_ranges_fsm(&mut outboard, data.into()); + assert_ne!(expected, actual); + } } -} -#[proptest] -#[cfg(feature = "validate")] -fn validate_fsm_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { - validate_fsm_neg_impl(tree, rand); -} + #[proptest] + fn validate_neg_proptest(#[strategy(tree())] tree: BaoTree, rand: u32) { + validate_neg_impl(tree, rand); + } -#[test] -#[cfg(feature = "validate")] -fn validate_fsm_neg_cases() { - let cases = [((0x2001, 0), 2738363904)]; - for ((size, block_level), rand) in cases { - let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); - validate_fsm_neg_impl(tree, rand); + #[test] + fn validate_neg_cases() { + let cases = [((0x2001, 0), 2738363904)]; + for ((size, block_level), rand) in cases { + let tree = BaoTree::new(ByteNum(size), BlockSize(block_level)); + validate_neg_impl(tree, rand); + } } -} -#[test] -fn validate_bug() { - let data = Bytes::from(make_test_data(19308432)); - let outboard = PostOrderMemOutboard::create(&data, BlockSize(4)); - let expected = ChunkRanges::from(..ByteNum(data.len() as u64).chunks()); - let actual = valid_ranges_fsm(outboard, data.clone()); - assert_eq!(expected, actual); + #[test] + fn validate_bug() { + let data = Bytes::from(make_test_data(19308432)); + let outboard = PostOrderMemOutboard::create(&data, BlockSize(4)); + let expected = ChunkRanges::from(..ByteNum(data.len() as u64).chunks()); + let actual = valid_ranges_fsm(outboard, data.clone()); + assert_eq!(expected, actual); + } } /// Encode data fully, decode it again, and check that both data and outboard are the same @@ -515,23 +489,15 @@ fn encode_decode_full_sync_impl( crate::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) .unwrap(); let mut encoded_read = std::io::Cursor::new(encoded); + let size = read_len(&mut encoded_read).unwrap(); + let tree = BaoTree::new(size, outboard.tree().block_size()); let mut decoded = Vec::new(); - let ob_res_opt = crate::io::sync::decode_response_into( + let mut ob_res = PostOrderMemOutboard::new( outboard.root(), - outboard.tree().block_size, - &ranges, - &mut encoded_read, - |tree, root: blake3::Hash| { - let outboard_size = usize::try_from(tree.outboard_hash_pairs() * 64).unwrap(); - let outboard_data = vec![0; outboard_size]; - Ok(PostOrderMemOutboard::new(root, tree, outboard_data).unwrap()) - }, - &mut decoded, - ) - .unwrap(); - let ob_res = ob_res_opt.unwrap_or_else(|| { - PostOrderMemOutboard::new(outboard.root(), outboard.tree(), vec![]).unwrap() - }); + tree, + vec![0; tree.outboard_size().to_usize()], + ); + crate::io::sync::decode_ranges(&ranges, encoded_read, &mut decoded, &mut ob_res).unwrap(); ((decoded, ob_res), (data.to_vec(), outboard)) } @@ -551,31 +517,25 @@ async fn encode_decode_full_fsm_impl( crate::io::fsm::encode_ranges_validated( Bytes::from(data.clone()), &mut outboard, - &ChunkRanges::all(), + &ranges, &mut encoded, ) .await .unwrap(); let mut read_encoded = std::io::Cursor::new(encoded); + let size = read_encoded.read_u64_le().await.unwrap(); + let mut ob_res = { + let tree = BaoTree::new(ByteNum(size), outboard.tree().block_size()); + let root = outboard.root(); + let outboard_size = usize::try_from(tree.outboard_hash_pairs() * 64).unwrap(); + let outboard_data = vec![0u8; outboard_size]; + PostOrderMemOutboard::new(root, tree, outboard_data) + }; let mut decoded = BytesMut::new(); - let ob_res_opt = crate::io::fsm::decode_response_into( - outboard.root(), - outboard.tree().block_size, - ranges, - &mut read_encoded, - |root, tree| async move { - let outboard_size = usize::try_from(tree.outboard_hash_pairs() * 64).unwrap(); - let outboard_data = vec![0u8; outboard_size]; - Ok(PostOrderMemOutboard::new(root, tree, outboard_data).unwrap()) - }, - &mut decoded, - ) - .await - .unwrap(); - let ob_res = ob_res_opt.unwrap_or_else(|| { - PostOrderMemOutboard::new(outboard.root(), outboard.tree(), vec![]).unwrap() - }); + crate::io::fsm::decode_ranges(read_encoded, ranges, &mut decoded, &mut ob_res) + .await + .unwrap(); ((data, outboard), (decoded.to_vec(), ob_res)) } @@ -587,13 +547,10 @@ fn encode_decode_partial_sync_impl( let mut encoded = Vec::new(); crate::io::sync::encode_ranges_validated(data, &outboard, ranges, &mut encoded).unwrap(); let expected_data = data; - let encoded_read = std::io::Cursor::new(encoded); - let iter = crate::io::sync::DecodeResponseIter::new( - outboard.root, - outboard.tree.block_size, - encoded_read, - ranges, - ); + let mut encoded_read = std::io::Cursor::new(encoded); + let size = read_len(&mut encoded_read).unwrap(); + let tree = BaoTree::new(size, outboard.tree.block_size); + let iter = crate::io::sync::DecodeResponseIter::new(outboard.root, tree, encoded_read, ranges); for item in iter { let item = match item { Ok(item) => item, @@ -602,13 +559,7 @@ fn encode_decode_partial_sync_impl( } }; match item { - DecodeResponseItem::Header(Header { size }) => { - // check that the size matches - if size != outboard.tree.size { - return false; - } - } - DecodeResponseItem::Parent(Parent { node, pair }) => { + BaoContentItem::Parent(Parent { node, pair }) => { // check that the hash pair matches if let Some(expected_pair) = outboard.load(node).unwrap() { if pair != expected_pair { @@ -616,7 +567,7 @@ fn encode_decode_partial_sync_impl( } } } - DecodeResponseItem::Leaf(Leaf { offset, data }) => { + BaoContentItem::Leaf(Leaf { offset, data }) => { // check that the data matches if expected_data[offset.to_usize()..offset.to_usize() + data.len()] != data { return false; @@ -1043,3 +994,7 @@ fn canonicalize_ranges_test() { assert_eq!(expected, actual); } } + +fn run_blocking(f: F) -> F::Output { + tokio::runtime::Runtime::new().unwrap().block_on(f) +}