Skip to content

Commit

Permalink
feat: feature native_model
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-herlemont committed Dec 18, 2023
1 parent 296a136 commit 91a3140
Show file tree
Hide file tree
Showing 42 changed files with 1,216 additions and 100 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bincode = { version = "2.0.0-rc.3", features = ["serde"] }

# Optional tokio support
tokio = { version = "1.29", features = ["sync"], optional = true }
native_model = { path = "../native_model", optional = true }

[dev-dependencies]
assert_fs = "1.0"
Expand All @@ -35,7 +36,7 @@ tokio = { version = "1.35", features = ["test-util","macros"] }

[features]
default = []
async_tokio = ["tokio"]
use_native_model = ["native_model", "struct_db_macro/use_native_model"]

[build-dependencies]
skeptic = "0.13"
skeptic = "0.13"
12 changes: 6 additions & 6 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::Result;
use crate::{watch, Db};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::sync::atomic::AtomicU64;
use crate::{Db, watch};
use super::Result;
use std::sync::{Arc, RwLock};

/// Builder for the [`Db`](super::Db) instance.
pub struct Builder {
Expand All @@ -29,7 +29,7 @@ impl Builder {
fn new_redb(redb_database: redb::Database) -> Db {
Db {
instance: redb_database,
table_definitions: HashMap::new(),
primary_table_definitions: HashMap::new(),
watchers: Arc::new(RwLock::new(watch::Watchers::new())),
watchers_counter_id: AtomicU64::new(0),
}
Expand All @@ -44,7 +44,7 @@ impl Builder {
/// Creates a new `Db` instance using the given path.
///
/// Similar to [redb::Builder.create(...)](https://docs.rs/redb/latest/redb/struct.Builder.html#method.create)
pub fn create(&self,path: impl AsRef<Path>) -> Result<Db> {
pub fn create(&self, path: impl AsRef<Path>) -> Result<Db> {
let db = self.new_rdb_builder().create(path)?;
Ok(Self::new_redb(db))
}
Expand Down Expand Up @@ -72,4 +72,4 @@ impl Builder {
let tmp_dir = tmp_dir.join(path);
self.open(tmp_dir.as_path())
}
}
}
156 changes: 143 additions & 13 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::watch;
use crate::builder::Builder;
use crate::stats::{Stats, StatsTable};
use crate::table_definition::PrimaryTableDefinition;
use crate::watch::MpscReceiver;
use crate::{watch, ReadableTable};
use crate::{Error, KeyDefinition, ReadOnlyTransaction, Result, SDBItem, Transaction};
use redb::TableHandle;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, RwLock};
use std::u64;
use crate::watch::MpscReceiver;
use crate::builder::Builder;

/// The `Db` struct represents a database instance. It allows add **schema**, create **transactions** and **watcher**.
pub struct Db {
pub(crate) instance: redb::Database,
pub(crate) table_definitions:
HashMap<&'static str, redb::TableDefinition<'static, &'static [u8], &'static [u8]>>,
pub(crate) primary_table_definitions: HashMap<&'static str, PrimaryTableDefinition>,
pub(crate) watchers: Arc<RwLock<watch::Watchers>>,
pub(crate) watchers_counter_id: AtomicU64,
}
Expand Down Expand Up @@ -62,17 +65,144 @@ impl Db {
/// // Initialize the table
/// db.define::<Data>();
/// }
pub fn define<T: SDBItem>(&mut self) {
pub fn define<T: SDBItem>(&mut self) -> Result<()> {
let schema = T::struct_db_schema();
let main_table_name = schema.table_name;
let main_table_definition = redb::TableDefinition::new(main_table_name);
self.table_definitions
.insert(main_table_name, main_table_definition);
let mut primary_table_definition: PrimaryTableDefinition =
(schema.clone(), main_table_definition).into();

#[cfg(feature = "use_native_model")]
{
primary_table_definition.native_model_id = T::native_model_id();
primary_table_definition.native_model_version = T::native_model_version();

// Set native model legacy
for other_primary_table_definition in self.primary_table_definitions.values_mut() {
if other_primary_table_definition.native_model_version
> primary_table_definition.native_model_version
{
other_primary_table_definition.native_model_legacy = false;
primary_table_definition.native_model_legacy = true;
} else {
other_primary_table_definition.native_model_legacy = true;
primary_table_definition.native_model_legacy = false;
}

// Panic if native model version are the same
if other_primary_table_definition.native_model_version
== primary_table_definition.native_model_version
{
panic!(
"The table {} has the same native model version as the table {} and it's not allowed",
other_primary_table_definition.redb.name(),
primary_table_definition.redb.name()
);
}
}
}

for secondary_table_name in schema.secondary_tables_name {
let secondary_table_definition = redb::TableDefinition::new(secondary_table_name);
self.table_definitions
.insert(secondary_table_name, secondary_table_definition);
primary_table_definition.secondary_tables.insert(
secondary_table_name,
redb::TableDefinition::new(secondary_table_name).into(),
);
}
self.primary_table_definitions
.insert(main_table_name, primary_table_definition);

Ok(())
}

#[cfg(feature = "use_native_model")]
pub fn migrate<T: SDBItem + Debug>(&mut self) -> Result<()> {
use redb::ReadableTable;

// Panic if T is legacy
let new_table_definition = self
.primary_table_definitions
.get(T::struct_db_schema().table_name)
.unwrap();
if new_table_definition.native_model_legacy {
// TODO: test
panic!(
"The table {} is legacy, you can't migrate it",
T::struct_db_schema().table_name
);
}

// Check which table are the data
let mut old_table_definition = None;
for other_primary_table_definition in self.primary_table_definitions.values() {
let rx = self.instance.begin_read()?;

// check if table exists, if the table does not exist continue
if rx
.list_tables()?
.find(|table| table.name() == other_primary_table_definition.redb.name())
.is_none()
{
continue;
}

let table = rx.open_table(other_primary_table_definition.redb.clone())?;
let len = table.len()?;
if len > 0 && old_table_definition.is_some() {
panic!(
"Impossible to migrate the table {} because the table {} has data",
T::struct_db_schema().table_name,
other_primary_table_definition.redb.name()
);
} else if table.len()? > 0 {
old_table_definition = Some(other_primary_table_definition);
}
}

// Check there data in the old table
if old_table_definition.is_none() {
// Nothing to migrate
return Ok(());
}

let old_table_definition = old_table_definition.unwrap();

// If the old table is the same as the new table, nothing to migrate
if old_table_definition.redb.name() == T::struct_db_schema().table_name {
// Nothing to migrate
return Ok(());
}

let wx = self.transaction()?;
{
let mut tables = wx.tables();
let old_data =
tables.internal_primary_drain(&wx, old_table_definition.schema.table_name, ..)?;

for old_data in old_data {
let (decoded_item, _) = native_model::decode::<T>(old_data.0).unwrap();
tables.insert(&wx, decoded_item)?;
}
}
wx.commit()?;

Ok(())
}

pub fn redb_stats(&self) -> Result<Stats> {
use redb::{ReadableTable, TableHandle};
let rx = self.instance.begin_read()?;
let mut stats_tables = vec![];
for table in rx.list_tables()? {
let table_definition: redb::TableDefinition<'_, &'static [u8], &'static [u8]> =
redb::TableDefinition::new(&table.name());
let table_open = rx.open_table(table_definition)?;
let num_raw = table_open.len()?;
stats_tables.push(StatsTable {
name: table.name().to_string(),
num_raw: num_raw as usize,
});
}
Ok(Stats { stats_tables })
}
}

Expand Down Expand Up @@ -128,7 +258,7 @@ impl Db {
pub fn transaction(&self) -> Result<Transaction> {
let txn = self.instance.begin_write()?;
let write_txn = Transaction {
table_definitions: &self.table_definitions,
table_definitions: &self.primary_table_definitions,
txn,
watcher: &self.watchers,
batch: RefCell::new(watch::Batch::new()),
Expand Down Expand Up @@ -168,7 +298,7 @@ impl Db {
pub fn read_transaction(&self) -> Result<ReadOnlyTransaction> {
let txn = self.instance.begin_read()?;
let read_txn = ReadOnlyTransaction {
table_definitions: &self.table_definitions,
table_definitions: &self.primary_table_definitions,
txn,
};
Ok(read_txn)
Expand Down
37 changes: 36 additions & 1 deletion src/item.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,55 @@
#[cfg(not(feature = "use_native_model"))]
pub trait SDBItem: Sized {
fn struct_db_schema() -> crate::Schema;
fn struct_db_primary_key(&self) -> Vec<u8>;

// Return map of secondary table name and the value of the key
fn struct_db_keys(&self) -> std::collections::HashMap<&'static str, Vec<u8>>;
fn struct_db_bincode_encode_to_vec(&self) -> Vec<u8>;
fn struct_db_bincode_decode_from_slice(slice: &[u8]) -> Self;

fn to_item(&self) -> Item {
Item {
primary_key: self.struct_db_primary_key(),
secondary_keys: self.struct_db_keys(),
value: self.struct_db_bincode_encode_to_vec(),
}
}
}

#[cfg(feature = "use_native_model")]
pub trait SDBItem: Sized + native_model::Model {
fn struct_db_schema() -> crate::Schema;
fn struct_db_primary_key(&self) -> Vec<u8>;
fn struct_db_keys(&self) -> std::collections::HashMap<&'static str, Vec<u8>>;
fn struct_db_bincode_encode_to_vec(&self) -> Vec<u8>;
fn struct_db_bincode_decode_from_slice(slice: &[u8]) -> Self;

fn to_item(&self) -> Item {
Item {
primary_key: self.struct_db_primary_key(),
secondary_keys: self.struct_db_keys(),
value: self.struct_db_bincode_encode_to_vec(),
}
}
}

pub trait KeyDefinition: Sized {
fn secondary_table_name(&self) -> &'static str;
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct BinaryValue(pub(crate) Vec<u8>);

impl BinaryValue {
pub fn inner<T: SDBItem>(&self) -> T {
T::struct_db_bincode_decode_from_slice(&self.0)
}
}

#[derive(Debug)]
pub struct Item {
pub(crate) primary_key: Vec<u8>,
pub(crate) secondary_keys: std::collections::HashMap<&'static str, Vec<u8>>,
pub(crate) value: Vec<u8>,
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ mod common;
mod db;
mod item;
mod iterator;
mod operation;
mod readable_table;
mod readonly_tables;
mod readonly_transaction;
mod schema;
mod serialization;
mod stats;
mod table_definition;
mod tables;
mod transaction;
pub mod watch;
Expand Down
1 change: 1 addition & 0 deletions src/operation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod write;
Loading

0 comments on commit 91a3140

Please sign in to comment.