Skip to content

Commit

Permalink
refactor: remove BlobOrCollection (#1078)
Browse files Browse the repository at this point in the history
The distinction is now just whether something is stored internally or externally.
  • Loading branch information
rklaehn committed Jun 8, 2023
1 parent 36cd904 commit 63a2529
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 70 deletions.
70 changes: 38 additions & 32 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,16 @@ impl CustomGetHandler for () {
}

/// A [`Database`] entry.
///
/// This is either stored externally in the file system, or internally in the database.
/// Collections are always stored internally for now.
///
/// Internally stored entries are stored in the iroh home directory when the database is
/// persisted.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlobOrCollection {
pub enum DbEntry {
/// A blob.
Blob {
External {
/// The bao outboard data.
outboard: Bytes,
/// Path to the original data, which must not change while in use.
Expand All @@ -134,40 +140,38 @@ pub enum BlobOrCollection {
size: u64,
},
/// A collection.
Collection {
/// The bao outboard data of the serialised [`Collection`].
Internal {
/// The bao outboard data.
outboard: Bytes,
/// The serialised [`Collection`].
/// The inline data.
data: Bytes,
},
}

impl BlobOrCollection {
pub(crate) fn is_blob(&self) -> bool {
matches!(self, BlobOrCollection::Blob { .. })
impl DbEntry {
pub(crate) fn is_external(&self) -> bool {
matches!(self, DbEntry::External { .. })
}

pub(crate) fn blob_path(&self) -> Option<&Path> {
match self {
BlobOrCollection::Blob { path, .. } => Some(path),
BlobOrCollection::Collection { .. } => None,
DbEntry::External { path, .. } => Some(path),
DbEntry::Internal { .. } => None,
}
}

pub(crate) fn outboard(&self) -> &Bytes {
match self {
BlobOrCollection::Blob { outboard, .. } => outboard,
BlobOrCollection::Collection { outboard, .. } => outboard,
DbEntry::External { outboard, .. } => outboard,
DbEntry::Internal { outboard, .. } => outboard,
}
}

/// A reader for the data
async fn data_reader(&self) -> io::Result<Either<Cursor<Bytes>, tokio::fs::File>> {
Ok(match self {
BlobOrCollection::Blob { path, .. } => {
Either::Right(tokio::fs::File::open(path).await?)
}
BlobOrCollection::Collection { data, .. } => Either::Left(Cursor::new(data.clone())),
DbEntry::External { path, .. } => Either::Right(tokio::fs::File::open(path).await?),
DbEntry::Internal { data, .. } => Either::Left(Cursor::new(data.clone())),
})
}

Expand All @@ -177,8 +181,8 @@ impl BlobOrCollection {
/// For blobs it is the blob size.
pub fn size(&self) -> u64 {
match self {
BlobOrCollection::Blob { size, .. } => *size,
BlobOrCollection::Collection { data, .. } => data.len() as u64,
DbEntry::External { size, .. } => *size,
DbEntry::Internal { data, .. } => data.len() as u64,
}
}
}
Expand Down Expand Up @@ -625,7 +629,7 @@ impl RpcHandler {
let items = self
.inner
.db
.blobs()
.external()
.map(|(hash, path, size)| ListBlobsResponse { hash, path, size });
futures::stream::iter(items)
}
Expand All @@ -634,15 +638,17 @@ impl RpcHandler {
self,
_msg: ListCollectionsRequest,
) -> impl Stream<Item = ListCollectionsResponse> + Send + 'static {
let items = self
.inner
.db
.collections()
.map(|(hash, collection)| ListCollectionsResponse {
hash,
total_blobs_count: collection.blobs.len(),
total_blobs_size: collection.total_blobs_size,
});
// collections are always stored internally, so we take everything that is stored internally
// and try to parse it as a collection
let items = self.inner.db.internal().filter_map(|(hash, collection)| {
Collection::from_bytes(&collection)
.ok()
.map(|collection| ListCollectionsResponse {
hash,
total_blobs_count: collection.blobs.len(),
total_blobs_size: collection.total_blobs_size,
})
});
futures::stream::iter(items)
}

Expand Down Expand Up @@ -1117,7 +1123,7 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
writer: &mut W,
) -> Result<(SentStatus, u64)> {
match db.get(&name) {
Some(BlobOrCollection::Blob {
Some(DbEntry::External {
outboard,
path,
size,
Expand Down Expand Up @@ -1274,7 +1280,7 @@ mod tests {
});
map.insert(
hash,
BlobOrCollection::Blob {
DbEntry::External {
outboard,
size,
path,
Expand All @@ -1288,7 +1294,7 @@ mod tests {
let (outboard, hash) = bao_tree::outboard(&data, IROH_BLOCK_SIZE);
let outboard = Bytes::from(outboard);
let hash = Hash::from(hash);
map.insert(hash, BlobOrCollection::Collection { outboard, data });
map.insert(hash, DbEntry::Internal { outboard, data });
}
let db = Database::default();
db.union_with(map);
Expand Down Expand Up @@ -1357,7 +1363,7 @@ mod tests {

let collection = {
let c = db.get(&hash).unwrap();
if let BlobOrCollection::Collection { data, .. } = c {
if let DbEntry::Internal { data, .. } = c {
Collection::from_bytes(&data)?
} else {
panic!("expected hash to correspond with a `Collection`, found `Blob` instead");
Expand Down
8 changes: 4 additions & 4 deletions src/provider/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::rpc_protocol::ProvideProgress;
use crate::util::{Progress, ProgressReader, ProgressReaderUpdate};
use crate::{Hash, IROH_BLOCK_SIZE};

use super::{BlobOrCollection, DataSource};
use super::{DataSource, DbEntry};

/// Creates a collection blob and returns all blobs in a hashmap.
///
Expand All @@ -29,7 +29,7 @@ use super::{BlobOrCollection, DataSource};
pub(super) async fn create_collection(
data_sources: Vec<DataSource>,
progress: Progress<ProvideProgress>,
) -> Result<(HashMap<Hash, BlobOrCollection>, Hash)> {
) -> Result<(HashMap<Hash, DbEntry>, Hash)> {
let mut outboards = compute_all_outboards(data_sources, progress.clone()).await?;

// TODO: Don't sort on async runtime?
Expand All @@ -50,7 +50,7 @@ pub(super) async fn create_collection(
debug_assert!(outboard.len() >= 8, "outboard must at least contain size");
map.insert(
hash,
BlobOrCollection::Blob {
DbEntry::External {
outboard,
path,
size,
Expand All @@ -69,7 +69,7 @@ pub(super) async fn create_collection(
let hash = Hash::from(hash);
map.insert(
hash,
BlobOrCollection::Collection {
DbEntry::Internal {
outboard: Bytes::from(outboard),
data: Bytes::from(data.to_vec()),
},
Expand Down
57 changes: 28 additions & 29 deletions src/provider/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::BlobOrCollection;
use super::DbEntry;
use crate::{
blobs::Collection,
rpc_protocol::ValidateProgress,
util::{validate_bao, BaoValidationError},
Hash,
Expand All @@ -21,17 +20,19 @@ use tokio::sync::mpsc;
const FNAME_OUTBOARDS: &str = "outboards";

/// File name of directory inside `IROH_DATA_DIR` where collections are stored.
///
/// This is now used not just for collections but also for internally generated blobs.
const FNAME_COLLECTIONS: &str = "collections";

/// File name inside `IROH_DATA_DIR` where paths to data are stored.
pub const FNAME_PATHS: &str = "paths.bin";

/// Database containing content-addressed data (blobs or collections).
#[derive(Debug, Clone, Default)]
pub struct Database(Arc<RwLock<HashMap<Hash, BlobOrCollection>>>);
pub struct Database(Arc<RwLock<HashMap<Hash, DbEntry>>>);

impl From<HashMap<Hash, BlobOrCollection>> for Database {
fn from(map: HashMap<Hash, BlobOrCollection>) -> Self {
impl From<HashMap<Hash, DbEntry>> for Database {
fn from(map: HashMap<Hash, DbEntry>) -> Self {
Self(Arc::new(RwLock::new(map)))
}
}
Expand Down Expand Up @@ -273,7 +274,7 @@ impl Database {
if let (Some(path), Some(outboard)) = (path, outboards.get(&hash)) {
db.insert(
hash,
BlobOrCollection::Blob {
DbEntry::External {
outboard: outboard.clone(),
path,
size,
Expand All @@ -285,7 +286,7 @@ impl Database {
if let Some(outboard) = outboards.get(&hash) {
db.insert(
hash,
BlobOrCollection::Collection {
DbEntry::Internal {
outboard: outboard.clone(),
data,
},
Expand All @@ -308,7 +309,7 @@ impl Database {
.clone()
.into_iter()
.collect::<Vec<_>>();
data.sort_by_key(|(k, e)| (e.is_blob(), e.blob_path().map(ToOwned::to_owned), *k));
data.sort_by_key(|(k, e)| (e.is_external(), e.blob_path().map(ToOwned::to_owned), *k));
tx.send(ValidateProgress::Starting {
total: data.len() as u64,
})
Expand All @@ -317,7 +318,7 @@ impl Database {
.enumerate()
.map(|(id, (hash, boc))| {
let id = id as u64;
let path = if let BlobOrCollection::Blob { path, .. } = &boc {
let path = if let DbEntry::External { path, .. } = &boc {
Some(path.clone())
} else {
None
Expand All @@ -342,7 +343,7 @@ impl Database {
.ok();
};
let res = match boc {
BlobOrCollection::Blob { outboard, path, .. } => {
DbEntry::External { outboard, path, .. } => {
match std::fs::File::open(&path) {
Ok(data) => {
tracing::info!("validating {}", path.display());
Expand All @@ -353,7 +354,7 @@ impl Database {
Err(cause) => Err(BaoValidationError::from(cause)),
}
}
BlobOrCollection::Collection { outboard, data } => {
DbEntry::Internal { outboard, data } => {
let data = std::io::Cursor::new(data);
validate_bao(hash, data, outboard, progress)
}
Expand Down Expand Up @@ -384,24 +385,24 @@ impl Database {
let outboards = this
.iter()
.map(|(k, v)| match v {
BlobOrCollection::Blob { outboard, .. } => (*k, outboard.clone()),
BlobOrCollection::Collection { outboard, .. } => (*k, outboard.clone()),
DbEntry::External { outboard, .. } => (*k, outboard.clone()),
DbEntry::Internal { outboard, .. } => (*k, outboard.clone()),
})
.collect::<Vec<_>>();

let collections = this
.iter()
.filter_map(|(k, v)| match v {
BlobOrCollection::Blob { .. } => None,
BlobOrCollection::Collection { data, .. } => Some((*k, data.clone())),
DbEntry::External { .. } => None,
DbEntry::Internal { data, .. } => Some((*k, data.clone())),
})
.collect::<Vec<_>>();

let paths = this
.iter()
.map(|(k, v)| match v {
BlobOrCollection::Blob { path, size, .. } => (*k, *size, Some(path.clone())),
BlobOrCollection::Collection { data, .. } => (*k, data.len() as u64, None),
DbEntry::External { path, size, .. } => (*k, *size, Some(path.clone())),
DbEntry::Internal { data, .. } => (*k, data.len() as u64, None),
})
.collect::<Vec<_>>();

Expand All @@ -412,27 +413,27 @@ impl Database {
}
}

pub(crate) fn get(&self, key: &Hash) -> Option<BlobOrCollection> {
pub(crate) fn get(&self, key: &Hash) -> Option<DbEntry> {
self.0.read().unwrap().get(key).cloned()
}

pub(crate) fn union_with(&self, db: HashMap<Hash, BlobOrCollection>) {
pub(crate) fn union_with(&self, db: HashMap<Hash, DbEntry>) {
let mut inner = self.0.write().unwrap();
for (k, v) in db {
inner.entry(k).or_insert(v);
}
}

/// Iterate over all blobs in the database.
pub fn blobs(&self) -> impl Iterator<Item = (Hash, PathBuf, u64)> + 'static {
/// Iterate over all blobs that are stored externally.
pub fn external(&self) -> impl Iterator<Item = (Hash, PathBuf, u64)> + 'static {
let items = self
.0
.read()
.unwrap()
.iter()
.filter_map(|(k, v)| match v {
BlobOrCollection::Blob { path, size, .. } => Some((*k, path.clone(), *size)),
BlobOrCollection::Collection { .. } => None,
DbEntry::External { path, size, .. } => Some((*k, path.clone(), *size)),
DbEntry::Internal { .. } => None,
})
.collect::<Vec<_>>();
// todo: make this a proper lazy iterator at some point
Expand All @@ -441,17 +442,15 @@ impl Database {
}

/// Iterate over all collections in the database.
pub fn collections(&self) -> impl Iterator<Item = (Hash, Collection)> + 'static {
pub fn internal(&self) -> impl Iterator<Item = (Hash, Bytes)> + 'static {
let items = self
.0
.read()
.unwrap()
.iter()
.filter_map(|(hash, v)| match v {
BlobOrCollection::Blob { .. } => None,
BlobOrCollection::Collection { data, .. } => {
Collection::from_bytes(&data[..]).ok().map(|c| (*hash, c))
}
DbEntry::External { .. } => None,
DbEntry::Internal { data, .. } => Some((*hash, data.clone())),
})
.collect::<Vec<_>>();
// todo: make this a proper lazy iterator at some point
Expand All @@ -460,7 +459,7 @@ impl Database {
}

/// Unwrap into the inner HashMap
pub fn to_inner(&self) -> HashMap<Hash, BlobOrCollection> {
pub fn to_inner(&self) -> HashMap<Hash, DbEntry> {
self.0.read().unwrap().clone()
}
}
Loading

0 comments on commit 63a2529

Please sign in to comment.