Skip to content

Commit

Permalink
Add MessageMetadata storage (iotaledger#760)
Browse files Browse the repository at this point in the history
* Rocksdb metadata storage

* Sled metadata storage

* Add metadata storage tests

* Format

* Fix typo
  • Loading branch information
thibault-martinez committed Oct 8, 2021
1 parent 93f049c commit 9d73c60
Show file tree
Hide file tree
Showing 23 changed files with 369 additions and 18 deletions.
35 changes: 34 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{
access::{Batch, BatchBuilder},
Expand Down Expand Up @@ -66,3 +66,36 @@ impl Batch<MessageId, Message> for Storage {
Ok(())
}
}

impl Batch<MessageId, MessageMetadata> for Storage {
fn batch_insert(
&self,
batch: &mut Self::Batch,
message_id: &MessageId,
message_metadata: &MessageMetadata,
) -> Result<(), <Self as StorageBackend>::Error> {
batch.value_buf.clear();
// Packing to bytes can't fail.
message_metadata.pack(&mut batch.value_buf).unwrap();

batch.inner.put_cf(
self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?,
message_id,
&batch.value_buf,
);

Ok(())
}

fn batch_delete(
&self,
batch: &mut Self::Batch,
message_id: &MessageId,
) -> Result<(), <Self as StorageBackend>::Error> {
batch
.inner
.delete_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?, message_id);

Ok(())
}
}
11 changes: 10 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_storage::{access::Delete, StorageBackend};

impl Delete<MessageId, Message> for Storage {
Expand All @@ -16,3 +16,12 @@ impl Delete<MessageId, Message> for Storage {
Ok(())
}
}

impl Delete<MessageId, MessageMetadata> for Storage {
fn delete(&self, message_id: &MessageId) -> Result<(), <Self as StorageBackend>::Error> {
self.inner
.delete_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?, message_id)?;

Ok(())
}
}
11 changes: 10 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/exist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_storage::{access::Exist, StorageBackend};

impl Exist<MessageId, Message> for Storage {
Expand All @@ -16,3 +16,12 @@ impl Exist<MessageId, Message> for Storage {
.is_some())
}
}

impl Exist<MessageId, MessageMetadata> for Storage {
fn exist(&self, message_id: &MessageId) -> Result<bool, <Self as StorageBackend>::Error> {
Ok(self
.inner
.get_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?, message_id)?
.is_some())
}
}
12 changes: 11 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{access::Fetch, system::System, StorageBackend};

Expand All @@ -28,3 +28,13 @@ impl Fetch<MessageId, Message> for Storage {
.map(|v| Message::unpack(&mut v.as_slice()).unwrap()))
}
}

impl Fetch<MessageId, MessageMetadata> for Storage {
fn fetch(&self, message_id: &MessageId) -> Result<Option<MessageMetadata>, <Self as StorageBackend>::Error> {
Ok(self
.inner
.get_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?, message_id)?
// Unpacking from storage slice can't fail.
.map(|v| MessageMetadata::unpack(&mut v.as_slice()).unwrap()))
}
}
19 changes: 18 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{access::Insert, system::System, StorageBackend};

Expand Down Expand Up @@ -36,3 +36,20 @@ impl Insert<MessageId, Message> for Storage {
Ok(())
}
}

impl Insert<MessageId, MessageMetadata> for Storage {
fn insert(
&self,
message_id: &MessageId,
message_metadata: &MessageMetadata,
) -> Result<(), <Self as StorageBackend>::Error> {
self.inner.put_cf(
self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_METADATA)?,
message_id,
// Packing to bytes can't fail.
message_metadata.pack_to_vec().unwrap(),
)?;

Ok(())
}
}
14 changes: 13 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{access::AsIterator, system::System, StorageBackend};

Expand Down Expand Up @@ -81,5 +81,17 @@ impl<'a> StorageIterator<'a, MessageId, Message> {
}
}

impl<'a> StorageIterator<'a, MessageId, MessageMetadata> {
fn unpack_key_value(mut key: &[u8], mut value: &[u8]) -> (MessageId, MessageMetadata) {
(
// Unpacking from storage slice can't fail.
MessageId::unpack(&mut key).unwrap(),
// Unpacking from storage slice can't fail.
MessageMetadata::unpack(&mut value).unwrap(),
)
}
}

impl_stream!(u8, System, CF_SYSTEM);
impl_stream!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE);
impl_stream!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_MESSAGE_METADATA);
3 changes: 2 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/multi_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{access::MultiFetch, system::System, StorageBackend};

Expand Down Expand Up @@ -54,3 +54,4 @@ macro_rules! impl_multi_fetch {

impl_multi_fetch!(u8, System, CF_SYSTEM);
impl_multi_fetch!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE);
impl_multi_fetch!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_MESSAGE_METADATA);
3 changes: 2 additions & 1 deletion bee-storage/bee-storage-rocksdb/src/access/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{column_families::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_storage::{access::Truncate, StorageBackend};

fn truncate(storage: &Storage, cf_str: &'static str) -> Result<(), <Storage as StorageBackend>::Error> {
Expand Down Expand Up @@ -45,3 +45,4 @@ macro_rules! impl_truncate {
}

impl_truncate!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE);
impl_truncate!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_MESSAGE_METADATA);
2 changes: 2 additions & 0 deletions bee-storage/bee-storage-rocksdb/src/column_families.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
pub const CF_SYSTEM: &str = "system";
/// Identifier for the `MessageId` to `Message` column family.
pub const CF_MESSAGE_ID_TO_MESSAGE: &str = "message_id_to_message";
/// Identifier for the `MessageId` to `MessageMetadata` column family.
pub const CF_MESSAGE_ID_TO_MESSAGE_METADATA: &str = "message_id_to_message_metadata";
9 changes: 7 additions & 2 deletions bee-storage/bee-storage-rocksdb/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ pub struct Storage {
impl Storage {
fn new(config: RocksDbConfig) -> Result<Self, Error> {
let cf_system = ColumnFamilyDescriptor::new(CF_SYSTEM, Options::default());

let cf_message_id_to_message = ColumnFamilyDescriptor::new(CF_MESSAGE_ID_TO_MESSAGE, Options::default());
let cf_message_id_to_message_metadata =
ColumnFamilyDescriptor::new(CF_MESSAGE_ID_TO_MESSAGE_METADATA, Options::default());

let mut opts = Options::default();
opts.create_if_missing(config.create_if_missing);
Expand Down Expand Up @@ -67,7 +68,11 @@ impl Storage {
env.set_high_priority_background_threads(config.env.set_high_priority_background_threads);
opts.set_env(&env);

let db = DB::open_cf_descriptors(&opts, config.path, vec![cf_system, cf_message_id_to_message])?;
let db = DB::open_cf_descriptors(
&opts,
config.path,
vec![cf_system, cf_message_id_to_message, cf_message_id_to_message_metadata],
)?;

let mut flushopts = FlushOptions::new();
flushopts.set_wait(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

#[macro_use]
mod access;

impl_access_test!(
message_id_to_message_metadata_access_rocksdb,
message_id_to_message_metadata_access
);
37 changes: 36 additions & 1 deletion bee-storage/bee-storage-sled/src/access/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{trees::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::packable::Packable;
use bee_storage::{
access::{Batch, BatchBuilder},
Expand Down Expand Up @@ -69,3 +69,38 @@ impl Batch<MessageId, Message> for Storage {
Ok(())
}
}

impl Batch<MessageId, MessageMetadata> for Storage {
fn batch_insert(
&self,
batch: &mut Self::Batch,
message_id: &MessageId,
message_metadata: &MessageMetadata,
) -> Result<(), <Self as StorageBackend>::Error> {
batch.value_buf.clear();
// Packing to bytes can't fail.
message_metadata.pack(&mut batch.value_buf).unwrap();

batch
.inner
.entry(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)
.or_default()
.insert(message_id.as_ref(), batch.value_buf.as_slice());

Ok(())
}

fn batch_delete(
&self,
batch: &mut Self::Batch,
message_id: &MessageId,
) -> Result<(), <Self as StorageBackend>::Error> {
batch
.inner
.entry(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)
.or_default()
.remove(message_id.as_ref());

Ok(())
}
}
12 changes: 11 additions & 1 deletion bee-storage/bee-storage-sled/src/access/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{trees::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_storage::{access::Delete, StorageBackend};

impl Delete<MessageId, Message> for Storage {
Expand All @@ -15,3 +15,13 @@ impl Delete<MessageId, Message> for Storage {
Ok(())
}
}

impl Delete<MessageId, MessageMetadata> for Storage {
fn delete(&self, message_id: &MessageId) -> Result<(), <Self as StorageBackend>::Error> {
self.inner
.open_tree(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)?
.remove(message_id)?;

Ok(())
}
}
11 changes: 10 additions & 1 deletion bee-storage/bee-storage-sled/src/access/exist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{trees::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_storage::{access::Exist, StorageBackend};

impl Exist<MessageId, Message> for Storage {
Expand All @@ -16,3 +16,12 @@ impl Exist<MessageId, Message> for Storage {
.contains_key(message_id)?)
}
}

impl Exist<MessageId, MessageMetadata> for Storage {
fn exist(&self, message_id: &MessageId) -> Result<bool, <Self as StorageBackend>::Error> {
Ok(self
.inner
.open_tree(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)?
.contains_key(message_id)?)
}
}
13 changes: 12 additions & 1 deletion bee-storage/bee-storage-sled/src/access/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{trees::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::Packable;
use bee_storage::{access::Fetch, system::System, StorageBackend};

Expand All @@ -29,3 +29,14 @@ impl Fetch<MessageId, Message> for Storage {
.map(|v| Message::unpack(&mut v.as_ref()).unwrap()))
}
}

impl Fetch<MessageId, MessageMetadata> for Storage {
fn fetch(&self, message_id: &MessageId) -> Result<Option<MessageMetadata>, <Self as StorageBackend>::Error> {
Ok(self
.inner
.open_tree(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)?
.get(message_id)?
// Unpacking from storage slice can't fail.
.map(|v| MessageMetadata::unpack(&mut v.as_ref()).unwrap()))
}
}
17 changes: 16 additions & 1 deletion bee-storage/bee-storage-sled/src/access/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use crate::{trees::*, Storage};

use bee_message::{Message, MessageId};
use bee_message::{Message, MessageId, MessageMetadata};
use bee_packable::packable::Packable;
use bee_storage::{access::Insert, system::System, StorageBackend};

Expand All @@ -32,3 +32,18 @@ impl Insert<MessageId, Message> for Storage {
Ok(())
}
}

impl Insert<MessageId, MessageMetadata> for Storage {
fn insert(
&self,
message_id: &MessageId,
message_metadata: &MessageMetadata,
) -> Result<(), <Self as StorageBackend>::Error> {
self.inner
.open_tree(TREE_MESSAGE_ID_TO_MESSAGE_METADATA)?
// Packing to bytes can't fail.
.insert(message_id, message_metadata.pack_to_vec().unwrap())?;

Ok(())
}
}
Loading

0 comments on commit 9d73c60

Please sign in to comment.