Skip to content

Commit

Permalink
feat: return an result error during serialization and deserialization
Browse files Browse the repository at this point in the history
Now all queries are returning a `Result` error during serialization and deserialization.

Note: better to use `try_collect` instead of `collect` provided by the `itertools` crate
      queries are returning a iterator of `Result`.
  • Loading branch information
vincent-herlemont committed May 20, 2024
1 parent aa04407 commit 9fae603
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 91 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ chrono = { version = "0.4", features = ["serde"] }
rand = "0.8"
once_cell = "1.19"
dinghy-test = "0.7.1"
itertools = "0.12"

[features]
default = [ "upgrade_0_5_x" ]
Expand Down
3 changes: 2 additions & 1 deletion benches/overhead_data_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use once_cell::sync::Lazy;
use rand::prelude::SliceRandom;
use redb::{ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use itertools::Itertools;

// 1 byte * 10000, 10 bytes * 10000, 100 bytes * 5000, 1KB * 1000, 1MB * 100, 10MB * 10
const ITERATIONS: &'static [(usize, usize)] = &[
Expand Down Expand Up @@ -133,7 +134,7 @@ fn use_native_db_insert(db: &Database, data: Data) {

fn use_native_db_scan(db: &Database) -> Vec<Data> {
let r = db.r_transaction().unwrap();
let out = r.scan().primary().unwrap().all().collect::<Vec<_>>();
let out= r.scan().primary().unwrap().all().try_collect().unwrap();
out
}

Expand Down
8 changes: 4 additions & 4 deletions native_db_macro/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub fn native_db(args: TokenStream, input: TokenStream) -> TokenStream {
#ast

impl native_db::db_type::Input for #struct_name {
fn native_db_bincode_encode_to_vec(&self) -> Vec<u8> {
native_db::bincode_encode_to_vec(self).expect(format!("Failed to serialize the struct {}", stringify!(#struct_name)).as_str())
fn native_db_bincode_encode_to_vec(&self) -> native_db::db_type::Result<Vec<u8>> {
native_db::bincode_encode_to_vec(self)
}

fn native_db_bincode_decode_from_slice(slice: &[u8]) -> Self {
native_db::bincode_decode_from_slice(slice).expect(format!("Failed to deserialize the struct {}", stringify!(#struct_name)).as_str()).0
fn native_db_bincode_decode_from_slice(slice: &[u8]) -> native_db::db_type::Result<Self> {
Ok(native_db::bincode_decode_from_slice(slice)?.0)
}

#native_db_model
Expand Down
3 changes: 3 additions & 0 deletions src/db_type/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,7 @@ pub enum Error {

#[error("You can not migrate the table {0} because it is a legacy model")]
MigrateLegacyModel(String),

#[error("Model error")]
ModelError(#[from] native_model::Error),
}
12 changes: 6 additions & 6 deletions src/db_type/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ pub trait Input: Sized + native_model::Model {
DatabaseKeyDefinition<DatabaseSecondaryKeyOptions>,
DatabaseKeyValue,
>;
fn native_db_bincode_encode_to_vec(&self) -> Vec<u8>;
fn native_db_bincode_decode_from_slice(slice: &[u8]) -> Self;
fn native_db_bincode_encode_to_vec(&self) -> Result<Vec<u8>>;
fn native_db_bincode_decode_from_slice(slice: &[u8]) -> Result<Self>;

fn to_item(&self) -> DatabaseInput {
DatabaseInput {
fn to_item(&self) -> Result<DatabaseInput> {
Ok(DatabaseInput {
primary_key: self.native_db_primary_key(),
secondary_keys: self.native_db_secondary_keys(),
value: self.native_db_bincode_encode_to_vec(),
}
value: self.native_db_bincode_encode_to_vec()?,
})
}
}
6 changes: 3 additions & 3 deletions src/db_type/output.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db_type::Input;
use crate::db_type::{Input,Result};

#[derive(Clone, Debug)]
pub struct DatabaseOutputValue(pub(crate) Vec<u8>);
Expand All @@ -10,12 +10,12 @@ impl From<&[u8]> for DatabaseOutputValue {
}

impl DatabaseOutputValue {
pub fn inner<T: Input>(&self) -> T {
pub fn inner<T: Input>(&self) -> Result<T> {
T::native_db_bincode_decode_from_slice(&self.0)
}
}

pub(crate) fn unwrap_item<T: Input>(item: Option<redb::AccessGuard<&'static [u8]>>) -> Option<T> {
pub(crate) fn unwrap_item<T: Input>(item: Option<redb::AccessGuard<&'static [u8]>>) -> Option<Result<T>> {
if let Some(item) = item {
let item = item.value();
let item = T::native_db_bincode_decode_from_slice(item);
Expand Down
10 changes: 5 additions & 5 deletions src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
pub fn bincode_encode_to_vec<T>(value: &T) -> Option<Vec<u8>>
pub fn bincode_encode_to_vec<T>(value: &T) -> crate::db_type::Result<Vec<u8>>
where
T: serde::Serialize + native_model::Model,
{
native_model::encode(value).ok()
native_model::encode(value).map_err(|e| e.into())
}

pub fn bincode_decode_from_slice<T>(slice: &[u8]) -> Option<(T, usize)>
pub fn bincode_decode_from_slice<T>(slice: &[u8]) -> crate::db_type::Result<(T, usize)>
where
T: serde::de::DeserializeOwned + native_model::Model,
{
let (data, _) = native_model::decode(slice.to_vec()).ok()?;
Some((data, 0))
let (data, _) = native_model::decode(slice.to_vec())?;
Ok((data, 0))
}
2 changes: 1 addition & 1 deletion src/transaction/internal/rw_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl<'db> InternalRwTransaction<'db> {
// List all data from the old table
for old_data in self.concrete_primary_drain(old_table_definition.model.clone())? {
let (decoded_item, _) = native_model::decode::<T>(old_data.0).unwrap();
let decoded_item = decoded_item.to_item();
let decoded_item = decoded_item.to_item()?;
self.concrete_insert(T::native_db_model(), decoded_item)?;
}

Expand Down
4 changes: 3 additions & 1 deletion src/transaction/query/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ pub struct RwDrain<'db, 'txn> {
}

impl<'db, 'txn> RwDrain<'db, 'txn> {
// TODO: Remove nested Result
pub fn primary<T: Input>(&self) -> Result<Vec<T>> {
let model = T::native_db_model();
let out = self.internal.concrete_primary_drain(model)?;
Ok(out.into_iter().map(|b| b.inner()).collect())
let out = out.into_iter().map(|b| b.inner()).collect::<Result<Vec<T>>>()?;
Ok(out)
}

/// **TODO: needs to be implemented**
Expand Down
24 changes: 20 additions & 4 deletions src/transaction/query/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ impl RGet<'_, '_> {
pub fn primary<T: Input>(&self, key: impl InnerKeyValue) -> Result<Option<T>> {
let model = T::native_db_model();
let result = self.internal.get_by_primary_key(model, key)?;
Ok(result.map(|value| value.inner()))
if let Some(value) = result {
Ok(Some(value.inner()?))
} else {
Ok(None)
}
}

/// Get a value from the database by secondary key.
Expand Down Expand Up @@ -87,7 +91,11 @@ impl RGet<'_, '_> {
) -> Result<Option<T>> {
let model = T::native_db_model();
let result = self.internal.get_by_secondary_key(model, key_def, key)?;
Ok(result.map(|value| value.inner()))
if let Some(value) = result {
Ok(Some(value.inner()?))
} else {
Ok(None)
}
}
}

Expand All @@ -102,7 +110,11 @@ impl RwGet<'_, '_> {
pub fn primary<T: Input>(&self, key: impl InnerKeyValue) -> Result<Option<T>> {
let model = T::native_db_model();
let result = self.internal.get_by_primary_key(model, key)?;
Ok(result.map(|value| value.inner()))
if let Some(value) = result {
Ok(Some(value.inner()?))
} else {
Ok(None)
}
}

/// Get a value from the database by secondary key.
Expand All @@ -115,6 +127,10 @@ impl RwGet<'_, '_> {
) -> Result<Option<T>> {
let model = T::native_db_model();
let result = self.internal.get_by_secondary_key(model, key_def, key)?;
Ok(result.map(|value| value.inner()))
if let Some(value) = result {
Ok(Some(value.inner()?))
} else {
Ok(None)
}
}
}
15 changes: 9 additions & 6 deletions src/transaction/query/scan/primary_scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db_type::{unwrap_item, DatabaseInnerKeyValue, DatabaseInnerKeyValueRange, Input};
use crate::db_type::{unwrap_item, DatabaseInnerKeyValue, DatabaseInnerKeyValueRange, Input, Result};
use crate::InnerKeyValue;
use std::marker::PhantomData;
use std::ops::RangeBounds;
Expand Down Expand Up @@ -30,6 +30,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -48,7 +49,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get all values
/// let _values: Vec<Data> = r.scan().primary()?.all().collect();
/// let _values: Vec<Data> = r.scan().primary()?.all().try_collect()?;
/// Ok(())
/// }
/// ```
Expand All @@ -70,6 +71,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -88,7 +90,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get the values from 5 to the end
/// let _values: Vec<Data> = r.scan().primary()?.range(5u64..).collect();
/// let _values: Vec<Data> = r.scan().primary()?.range(5u64..).try_collect()?;
/// Ok(())
/// }
/// ```
Expand All @@ -111,6 +113,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -129,7 +132,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get the values starting with "victor"
/// let _values: Vec<Data> = r.scan().primary()?.start_with("victor").collect();
/// let _values: Vec<Data> = r.scan().primary()?.start_with("victor").try_collect()?;
/// Ok(())
/// }
/// ```
Expand All @@ -156,7 +159,7 @@ pub struct PrimaryScanIterator<'a, T: Input> {
}

impl<'a, T: Input> Iterator for PrimaryScanIterator<'a, T> {
type Item = T;
type Item = Result<T>;

fn next(&mut self) -> Option<Self::Item> {
match self.range.next() {
Expand All @@ -181,7 +184,7 @@ pub struct PrimaryScanIteratorStartWith<'a, T: Input> {
}

impl<'a, T: Input> Iterator for PrimaryScanIteratorStartWith<'a, T> {
type Item = T;
type Item = Result<T>;

fn next(&mut self) -> Option<Self::Item> {
match self.range.next() {
Expand Down
15 changes: 9 additions & 6 deletions src/transaction/query/scan/secondary_scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db_type::{unwrap_item, DatabaseInnerKeyValue, DatabaseInnerKeyValueRange, Input};
use crate::db_type::{unwrap_item, DatabaseInnerKeyValue, DatabaseInnerKeyValueRange, Input, Result};
use crate::InnerKeyValue;
use redb;
use std::marker::PhantomData;
Expand Down Expand Up @@ -40,6 +40,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -60,7 +61,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get only values that have the secondary key set (name is not None)
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.all().collect();
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.all().try_collect()?;
/// Ok(())
/// }
/// ```
Expand All @@ -85,6 +86,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -105,7 +107,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get only values that have the secondary key name from C to the end
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.range("C"..).collect();
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.range("C"..).try_collect()?;
/// Ok(())
/// }
/// ```
Expand Down Expand Up @@ -134,6 +136,7 @@ where
/// use native_db::*;
/// use native_model::{native_model, Model};
/// use serde::{Deserialize, Serialize};
/// use itertools::Itertools;
///
/// #[derive(Serialize, Deserialize)]
/// #[native_model(id=1, version=1)]
Expand All @@ -154,7 +157,7 @@ where
/// let r = db.r_transaction()?;
///
/// // Get only values that have the secondary key name starting with "hello"
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.start_with("hello").collect();
/// let _values: Vec<Data> = r.scan().secondary(DataKey::name)?.start_with("hello").try_collect()?;
/// Ok(())
/// }
/// ```
Expand Down Expand Up @@ -189,7 +192,7 @@ impl<'a, PrimaryTable, T: Input> Iterator for SecondaryScanIterator<'a, PrimaryT
where
PrimaryTable: redb::ReadableTable<DatabaseInnerKeyValue, &'static [u8]>,
{
type Item = T;
type Item = Result<T>;

fn next(&mut self) -> Option<Self::Item> {
match self.range.next() {
Expand Down Expand Up @@ -233,7 +236,7 @@ where
PrimaryTable: redb::ReadableTable<DatabaseInnerKeyValue, &'static [u8]>,
T: Input,
{
type Item = T;
type Item = Result<T>;

fn next(&mut self) -> Option<Self::Item> {
match self.range.next() {
Expand Down
17 changes: 9 additions & 8 deletions src/transaction/rw_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<'db, 'txn> RwTransaction<'db> {
pub fn insert<T: Input>(&self, item: T) -> Result<()> {
let (watcher_request, binary_value) = self
.internal
.concrete_insert(T::native_db_model(), item.to_item())?;
.concrete_insert(T::native_db_model(), item.to_item()?)?;
let event = Event::new_insert(binary_value);
self.batch.borrow_mut().add(watcher_request, event);
Ok(())
Expand Down Expand Up @@ -168,10 +168,10 @@ impl<'db, 'txn> RwTransaction<'db> {
pub fn remove<T: Input>(&self, item: T) -> Result<T> {
let (watcher_request, binary_value) = self
.internal
.concrete_remove(T::native_db_model(), item.to_item())?;
.concrete_remove(T::native_db_model(), item.to_item()?)?;
let event = Event::new_delete(binary_value.clone());
self.batch.borrow_mut().add(watcher_request, event);
Ok(binary_value.inner())
binary_value.inner()
}

/// Update a value in the database.
Expand Down Expand Up @@ -212,8 +212,8 @@ impl<'db, 'txn> RwTransaction<'db> {
pub fn update<T: Input>(&self, old_item: T, updated_item: T) -> Result<()> {
let (watcher_request, old_binary_value, new_binary_value) = self.internal.concrete_update(
T::native_db_model(),
old_item.to_item(),
updated_item.to_item(),
old_item.to_item()?,
updated_item.to_item()?,
)?;
let event = Event::new_update(old_binary_value, new_binary_value);
self.batch.borrow_mut().add(watcher_request, event);
Expand Down Expand Up @@ -280,13 +280,14 @@ impl<'db, 'txn> RwTransaction<'db> {
OldType: Input + Clone,
NewType: Input + From<OldType>,
{
let find_all_old: Vec<OldType> = self.scan().primary()?.all().collect();
let find_all_old: Result<Vec<OldType>> = self.scan().primary()?.all().collect();
let find_all_old = find_all_old?;
for old in find_all_old {
let new: NewType = old.clone().into();
self.internal
.concrete_insert(NewType::native_db_model(), new.to_item())?;
.concrete_insert(NewType::native_db_model(), new.to_item()?)?;
self.internal
.concrete_remove(OldType::native_db_model(), old.to_item())?;
.concrete_remove(OldType::native_db_model(), old.to_item()?)?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit 9fae603

Please sign in to comment.