Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tracing = "0.1"
rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.6.0" }
fastrand = "2.3.0"
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4", "v7"] }
data_bucket = "0.2.6"
Expand Down
37 changes: 34 additions & 3 deletions codegen/src/persist_index/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,23 @@ impl Generator {
}
} else {
quote! {
let node = UnsizedNode::from_inner(page.inner.get_node().into_iter().map(|p| p.into()).collect(), #const_name);
let inner = page.inner.get_node();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
if p.key == last_key {
let multi = p.with_last_discriminator(discriminator) ;
discriminator = multi.discriminator;
multi
} else {
last_key = p.key.clone();
let multi: IndexMultiPair<_, _> = p.into();
discriminator = multi.discriminator;
multi
}
}).collect::<Vec<_>>();
inner.sort();
let node = UnsizedNode::from_inner(inner, #const_name);
#i.attach_multi_node(node);
}
};
Expand All @@ -387,8 +403,23 @@ impl Generator {
}
} else {
quote! {
let node = page.inner.get_node();
#i.attach_multi_node(node.into_iter().map(|p| p.into()).collect());
let inner = page.inner.get_node();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
if p.key == last_key {
let multi = p.with_last_discriminator(discriminator) ;
discriminator = multi.discriminator;
multi
} else {
last_key = p.key.clone();
let multi: IndexMultiPair<_, _> = p.into();
discriminator = multi.discriminator;
multi
}
}).collect::<Vec<_>>();
inner.sort();
#i.attach_multi_node(inner);
}
};
quote! {
Expand Down
2 changes: 2 additions & 0 deletions src/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod multipair;
mod table_index;
mod table_index_cdc;
mod table_secondary_index;
Expand All @@ -6,6 +7,7 @@ mod unsized_node;

pub use indexset::concurrent::map::BTreeMap as IndexMap;
pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap;
pub use multipair::MultiPairRecreate;
pub use table_index::TableIndex;
pub use table_index_cdc::TableIndexCdc;
pub use table_secondary_index::{IndexError, TableSecondaryIndex, TableSecondaryIndexCdc};
Expand Down
17 changes: 17 additions & 0 deletions src/index/multipair.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use data_bucket::Link;
use indexset::core::multipair::MultiPair;
use indexset::core::pair::Pair;

pub trait MultiPairRecreate<T> {
fn with_last_discriminator(self, discriminator: u64) -> MultiPair<T, Link>;
}

impl<T> MultiPairRecreate<T> for Pair<T, Link> {
fn with_last_discriminator(self, discriminator: u64) -> MultiPair<T, Link> {
MultiPair {
key: self.key,
value: self.value,
discriminator: fastrand::u64(discriminator..),
}
}
}
35 changes: 30 additions & 5 deletions src/index/unsized_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use indexset::core::node::NodeLike;

use std::borrow::Borrow;
use std::collections::Bound;
use std::fmt::Debug;
use std::ops::Deref;
use std::slice::Iter;

pub const UNSIZED_HEADER_LENGTH: u32 = 64;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct UnsizedNode<T>
where
T: SizeMeasurable,
Expand Down Expand Up @@ -49,7 +50,7 @@ where

impl<T> NodeLike<T> for UnsizedNode<T>
where
T: SizeMeasurable + Ord + Default + VariableSizeMeasurable,
T: SizeMeasurable + Ord + Default + Debug + VariableSizeMeasurable,
{
fn with_capacity(capacity: usize) -> Self {
Self {
Expand All @@ -75,7 +76,7 @@ where
let mut middle_idx = 0;
let mut iter = self.inner.iter();
while !ind {
let val = iter.next().expect("we stop before node's end");
let val = iter.next().expect("we should stop before node's end");
current_length += val.aligned_size();
current_length += UnsizedIndexPageUtility::<T>::slots_value_size();
let current_middle_variance =
Expand Down Expand Up @@ -192,9 +193,11 @@ where

#[cfg(test)]
mod test {
use indexset::core::node::NodeLike;

use crate::index::unsized_node::UnsizedNode;
use data_bucket::Link;
use indexset::concurrent::multimap::BTreeMultiMap;
use indexset::core::multipair::MultiPair;
use indexset::core::node::NodeLike;

#[test]
fn test_split_basic() {
Expand Down Expand Up @@ -223,4 +226,26 @@ mod test {
assert_eq!(split.length, 136);
assert_eq!(split.inner.len(), 1);
}

#[test]
fn test_get_works_as_expected_at_big_amounts() {
let maximum_node_size = 1000;
let map = BTreeMultiMap::<String, Link, UnsizedNode<MultiPair<String, Link>>>::with_maximum_node_size(maximum_node_size);

for i in 1..2000 {
map.insert(
format!("ValueNum{}", i % 200),
Link {
page_id: Default::default(),
offset: i,
length: i,
},
);
}

for i in 1..200 {
let range = map.get(&format!("ValueNum{}", i)).collect::<Vec<_>>();
assert_eq!(range.len(), 10)
}
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ pub mod prelude {
pub use crate::table::system_info::{IndexInfo, IndexKind, SystemInfo};
pub use crate::util::{OrderedF32Def, OrderedF64Def};
pub use crate::{
lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, TableIndex, TableIndexCdc,
TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps,
UnsizedNode, WorkTable, WorkTableError,
lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, MultiPairRecreate, TableIndex,
TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc,
TableSecondaryIndexEventsOps, UnsizedNode, WorkTable, WorkTableError,
};
pub use data_bucket::{
align, get_index_page_size_from_data_length, map_data_pages_to_general, parse_data_page,
Expand Down
48 changes: 48 additions & 0 deletions tests/persistence/sync/string_secondary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,51 @@ fn test_space_delete_query_sync() {
}
});
}

#[test]
fn test_space_all_data_is_available() {
let config = PersistenceConfig::new(
"tests/data/unsized_secondary_sync/data_is_available",
"tests/data/unsized_secondary_sync/data_is_available",
);

let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_io()
.enable_time()
.build()
.unwrap();

runtime.block_on(async {
remove_dir_if_exists("tests/data/unsized_secondary_sync/data_is_available".to_string())
.await;

{
let table = TestSyncWorkTable::load_from_file(config.clone())
.await
.unwrap();
for i in 0..2000 {
let row = TestSyncRow {
another: format!("ValueNumber{}", i),
non_unique: i % 200,
field: 0.0,
id: table.get_next_pk().0,
};
table.insert(row.clone()).unwrap();
}

table.wait_for_ops().await;
};
{
let table = TestSyncWorkTable::load_from_file(config).await.unwrap();
for i in 0..2000 {
assert!(table
.select_by_another(format!("ValueNumber{}", i))
.is_some());
}
for i in 0..200 {
assert_eq!(table.select_by_non_unique(i).execute().unwrap().len(), 10,);
}
}
});
}
Loading