Skip to content

Commit

Permalink
Move collection to iroh-bytes and add metadata header (#1841)
Browse files Browse the repository at this point in the history
## Description

This changes the collection type a bit to make it more idiomatic rust.
E.g. it now implements traits to use it like a... collection, like
FromIterator. It also moves collection (back) to iroh-bytes so it can be
used from tools like sendme without having to depend on all of iroh.

This collection format is intended to be a minimum viable metadata
format that we can support for a long time - just a sequence of strings
with a string for each blob in the HashSeq. It is useful for tools like
sendme and for gateways, so I think this is valuable enough to have as a
separate format to e.g. document snapshots.

## Notes & open questions

Should this be sequences of blobs? Weirdly, in this case I am OK with a
sequence of strings. If somebody wants names to be arbitrary blobs they
can always define another metadata format.

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn committed Dec 18, 2023
1 parent 4002c46 commit f9776bb
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 174 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions iroh-bytes/src/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Defines data formats for HashSeq.
//!
//! The exact details how to use a HashSeq for specific purposes is up to the
//! user. However, the following approach is used by iroh formats:
//!
//! The first child blob is a metadata blob. It starts with a header, followed
//! by serialized metadata. We mostly use [postcard] for serialization. The
//! metadata either implicitly or explicitly refers to the other blobs in the
//! HashSeq by index.
//!
//! In a very simple case, the metadata just an array of items, where each item
//! is the metadata for the corresponding blob. The metadata array will have
//! n-1 items, where n is the number of blobs in the HashSeq.
//!
//! [postcard]: https://docs.rs/postcard/latest/postcard/
pub mod collection;
192 changes: 111 additions & 81 deletions iroh/src/collection.rs → iroh-bytes/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,90 @@ use std::collections::BTreeMap;
use anyhow::Context;
use bao_tree::blake3;
use bytes::Bytes;
use iroh_bytes::get::fsm::EndBlobNext;
use iroh_bytes::get::Stats;
use iroh_bytes::hashseq::HashSeq;
use iroh_bytes::store::MapEntry;
use iroh_bytes::util::TempTag;
use iroh_bytes::{BlobFormat, Hash};
use iroh_io::AsyncSliceReaderExt;
use serde::{Deserialize, Serialize};

use crate::{
get::{fsm, Stats},
hashseq::HashSeq,
store::MapEntry,
util::TempTag,
BlobFormat, Hash,
};

/// A collection of blobs
///
/// Note that the format is subject to change.
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Default)]
pub struct Collection {
/// Links to the blobs in this collection
pub(crate) blobs: Vec<Blob>,
/// The total size of the raw_data referred to by all links
pub(crate) total_blobs_size: u64,
blobs: Vec<(String, Hash)>,
}

impl std::ops::Index<usize> for Collection {
type Output = (String, Hash);

fn index(&self, index: usize) -> &Self::Output {
&self.blobs[index]
}
}

impl<K, V> Extend<(K, V)> for Collection
where
K: Into<String>,
V: Into<Hash>,
{
fn extend<T: IntoIterator<Item = (K, V)>>(&mut self, iter: T) {
self.blobs
.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())));
}
}

impl<K, V> FromIterator<(K, V)> for Collection
where
K: Into<String>,
V: Into<Hash>,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
let mut res = Self::default();
res.extend(iter);
res
}
}

impl IntoIterator for Collection {
type Item = (String, Hash);
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.blobs.into_iter()
}
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
struct CollectionMeta {
header: [u8; 13], // Must contain "CollectionV0."
names: Vec<String>,
total_blobs_size: u64,
}

impl Collection {
/// The header for the collection format.
///
/// This is the start of the metadata blob.
pub const HEADER: &'static [u8; 13] = b"CollectionV0.";

/// Convert the collection to an iterator of blobs, with the last being the
/// root blob.
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
total_blobs_size: self.total_blobs_size(),
};
let meta_bytes = postcard::to_stdvec(&meta).unwrap();
let meta_bytes_hash = blake3::hash(&meta_bytes).into();
Expand All @@ -56,15 +103,15 @@ impl Collection {
/// Returns the fsm at the start of the first child blob (if any),
/// the links array, and the collection.
pub async fn read_fsm(
fsm_at_start_root: iroh_bytes::get::fsm::AtStartRoot,
) -> anyhow::Result<(iroh_bytes::get::fsm::EndBlobNext, HashSeq, Collection)> {
fsm_at_start_root: fsm::AtStartRoot,
) -> anyhow::Result<(fsm::EndBlobNext, HashSeq, Collection)> {
let (next, links) = {
let curr = fsm_at_start_root.next();
let (curr, data) = curr.concatenate_into_vec().await?;
let links = HashSeq::new(data.into()).context("links could not be parsed")?;
(curr.next(), links)
};
let EndBlobNext::MoreChildren(at_meta) = next else {
let fsm::EndBlobNext::MoreChildren(at_meta) = next else {
anyhow::bail!("expected meta");
};
let (next, collection) = {
Expand All @@ -73,7 +120,13 @@ impl Collection {
let curr = at_meta.next(meta_link);
let (curr, names) = curr.concatenate_into_vec().await?;
let names = postcard::from_bytes::<CollectionMeta>(&names)?;
let collection = Collection::from_parts(children, names)?;
anyhow::ensure!(
names.header == *Self::HEADER,
"expected header {:?}, got {:?}",
Self::HEADER,
names.header
);
let collection = Collection::from_parts(children, names);
(curr.next(), collection)
};
Ok((next, links, collection))
Expand All @@ -83,14 +136,14 @@ impl Collection {
///
/// Returns the collection, a map from blob offsets to bytes, and the stats.
pub async fn read_fsm_all(
fsm_at_start_root: iroh_bytes::get::fsm::AtStartRoot,
fsm_at_start_root: crate::get::fsm::AtStartRoot,
) -> anyhow::Result<(Collection, BTreeMap<u64, Bytes>, Stats)> {
let (next, links, collection) = Self::read_fsm(fsm_at_start_root).await?;
let mut res = BTreeMap::new();
let mut curr = next;
let end = loop {
match curr {
EndBlobNext::MoreChildren(more) => {
fsm::EndBlobNext::MoreChildren(more) => {
let child_offset = more.child_offset();
let Some(hash) = links.get(usize::try_from(child_offset)?) else {
break more.finish();
Expand All @@ -100,7 +153,7 @@ impl Collection {
res.insert(child_offset - 1, blob.into());
curr = next.next();
}
EndBlobNext::Closing(closing) => break closing,
fsm::EndBlobNext::Closing(closing) => break closing,
}
};
let stats = end.next().await?;
Expand All @@ -113,7 +166,7 @@ impl Collection {
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: iroh_bytes::store::Map,
D: crate::store::Map,
{
let links_entry = db.get(root).context("links not found")?;
anyhow::ensure!(links_entry.is_complete(), "links not complete");
Expand All @@ -128,14 +181,14 @@ impl Collection {
meta.names.len() == links.len(),
"names and links length mismatch"
);
Self::from_parts(links, meta)
Ok(Self::from_parts(links, meta))
}

/// Store a collection in a store. returns the root hash of the collection
/// as a TempTag.
pub async fn store<D>(self, db: &D) -> anyhow::Result<TempTag>
where
D: iroh_bytes::store::Store,
D: crate::store::Store,
{
let (links, meta) = self.into_parts();
let meta_bytes = postcard::to_stdvec(&meta)?;
Expand All @@ -151,104 +204,81 @@ impl Collection {

/// Split a collection into a sequence of links and metadata
fn into_parts(self) -> (Vec<Hash>, CollectionMeta) {
let mut names = Vec::with_capacity(self.blobs().len());
let mut links = Vec::with_capacity(self.blobs().len());
for blob in self.blobs {
names.push(blob.name);
links.push(blob.hash);
let mut names = Vec::with_capacity(self.blobs.len());
let mut links = Vec::with_capacity(self.blobs.len());
for (name, hash) in self.blobs {
names.push(name);
links.push(hash);
}
let meta = CollectionMeta {
header: *Self::HEADER,
names,
total_blobs_size: self.total_blobs_size,
};
(links, meta)
}

/// Create a new collection from a list of hashes and metadata
fn from_parts(
links: impl IntoIterator<Item = Hash>,
meta: CollectionMeta,
) -> anyhow::Result<Self> {
let blobs = links
.into_iter()
.zip(meta.names)
.map(|(hash, name)| Blob { name, hash })
.collect();
Self::new(blobs, meta.total_blobs_size)
}

/// Create a new collection from a list of blobs and total size of the raw data
pub fn new(blobs: Vec<Blob>, total_blobs_size: u64) -> anyhow::Result<Self> {
let mut blobs = blobs;
let n = blobs.len();
blobs.sort_by(|a, b| a.name.cmp(&b.name));
blobs.dedup_by(|a, b| a.name == b.name);
anyhow::ensure!(n == blobs.len(), "duplicate blob names");
Ok(Self {
blobs,
total_blobs_size,
})
fn from_parts(links: impl IntoIterator<Item = Hash>, meta: CollectionMeta) -> Self {
meta.names.into_iter().zip(links).collect()
}

/// Get the links to the blobs in this collection
fn links(&self) -> impl Iterator<Item = Hash> + '_ {
self.blobs.iter().map(|x| x.hash)
self.blobs.iter().map(|(_name, hash)| *hash)
}

/// Get the names of the blobs in this collection
fn names(&self) -> Vec<String> {
self.blobs.iter().map(|x| x.name.clone()).collect()
self.blobs.iter().map(|(name, _)| name.clone()).collect()
}

/// Blobs in this collection
pub fn blobs(&self) -> &[Blob] {
&self.blobs
/// Iterate over the blobs in this collection
pub fn iter(&self) -> impl Iterator<Item = &(String, Hash)> {
self.blobs.iter()
}

/// Take ownership of the blobs in this collection
pub fn into_inner(self) -> Vec<Blob> {
self.blobs
}

/// Total size of the raw data referred to by all blobs in this collection
pub fn total_blobs_size(&self) -> u64 {
self.total_blobs_size
/// Get the number of blobs in this collection
pub fn len(&self) -> usize {
self.blobs.len()
}

/// The number of blobs in this collection
pub fn total_entries(&self) -> u64 {
self.blobs.len() as u64
/// Check if this collection is empty
pub fn is_empty(&self) -> bool {
self.blobs.is_empty()
}
}

/// A blob entry of a collection
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Blob {
/// The name of this blob of data
pub name: String,
/// The hash of the blob of data
pub hash: Hash,
}

#[cfg(test)]
mod tests {
use super::*;
use bao_tree::blake3;

#[test]
fn roundtrip_blob() {
let b = Blob {
name: "test".to_string(),
hash: blake3::Hash::from_hex(
let b = (
"test".to_string(),
blake3::Hash::from_hex(
"3aa61c409fd7717c9d9c639202af2fae470c0ef669be7ba2caea5779cb534e9d",
)
.unwrap()
.into(),
};
);

let mut buf = bytes::BytesMut::zeroed(1024);
postcard::to_slice(&b, &mut buf).unwrap();
let deserialize_b: Blob = postcard::from_bytes(&buf).unwrap();
let deserialize_b: (String, Hash) = postcard::from_bytes(&buf).unwrap();
assert_eq!(b, deserialize_b);
}

#[test]
fn roundtrip_collection_meta() {
let expected = CollectionMeta {
header: *Collection::HEADER,
names: vec!["test".to_string(), "a".to_string(), "b".to_string()],
};
let mut buf = bytes::BytesMut::zeroed(1024);
postcard::to_slice(&expected, &mut buf).unwrap();
let actual: CollectionMeta = postcard::from_bytes(&buf).unwrap();
assert_eq!(expected, actual);
}
}
1 change: 1 addition & 0 deletions iroh-bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
#![recursion_limit = "256"]

pub mod format;
pub mod get;
pub mod hashseq;
pub mod protocol;
Expand Down
Loading

0 comments on commit f9776bb

Please sign in to comment.