diff --git a/Cargo.toml b/Cargo.toml index b72ea04e..e79cd38d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.5.6" +version = "0.6.1" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -22,12 +22,12 @@ tokio = { version = "1", features = ["full"] } tracing = "0.1" rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = { version = "0.5.1" } -worktable_codegen = { path = "codegen", version = "0.5.5" } +worktable_codegen = { path = "codegen", version = "0.6.0" } futures = "0.3.30" -uuid = { version = "1.10.0", features = ["v4"] } -#data_bucket = "0.2.3" -data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } -# data_bucket = { path = "../DataBucket", version = "0.2.2" } +uuid = { version = "1.10.0", features = ["v4", "v7"] } +data_bucket = "0.2.4" +# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } +# data_bucket = { path = "../DataBucket", version = "0.2.3" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } indexset = { version = "0.12.2", features = ["concurrent", "cdc", "multimap"] } diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 6249881c..d7c67e52 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.5.5" +version = "0.6.0" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index d4276f4d..b4863d30 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -98,11 +98,11 @@ impl Generator { if is_unsized(&t.to_string()) { let const_size = name_generator.get_page_inner_size_const_ident(); quote! { - #i: (Vec>>, Vec>>), + #i: (Vec>>, Vec>>), } } else { quote! { - #i: (Vec>>, Vec>>), + #i: (Vec>>, Vec>>), } } }) diff --git a/codegen/src/persist_index/space.rs b/codegen/src/persist_index/space.rs index 8dc171b3..d3043bf8 100644 --- a/codegen/src/persist_index/space.rs +++ b/codegen/src/persist_index/space.rs @@ -9,9 +9,11 @@ impl Generator { let secondary_index = self.gen_space_secondary_index_type(); let secondary_impl = self.gen_space_secondary_index_impl_space_index(); let secondary_index_events = self.gen_space_secondary_index_events_type(); + let secondary_index_events_impl = self.gen_space_secondary_index_events_impl(); quote! { #secondary_index_events + #secondary_index_events_impl #secondary_index #secondary_impl } @@ -34,13 +36,36 @@ impl Generator { .collect(); quote! { - #[derive(Debug, Default)] + #[derive(Clone, Debug, Default)] pub struct #ident { #(#fields)* } } } + fn gen_space_secondary_index_events_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + + let fields: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + self.#i.extend(another.#i); + } + }) + .collect(); + + quote! { + impl TableSecondaryIndexEventsOps for #ident { + fn extend(&mut self, another: #ident) { + #(#fields)* + } + } + } + } + fn gen_space_secondary_index_type(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); let ident = name_generator.get_space_secondary_index_ident(); @@ -78,11 +103,14 @@ impl Generator { let from_table_files_path_fn = self.gen_space_secondary_index_from_table_files_path_fn(); let index_process_change_events_fn = self.gen_space_secondary_index_process_change_events_fn(); + let index_process_change_event_batch_fn = + self.gen_space_secondary_index_process_change_event_batch_fn(); quote! { impl SpaceSecondaryIndexOps<#events_ident> for #ident { #from_table_files_path_fn #index_process_change_events_fn + #index_process_change_event_batch_fn } } } @@ -138,4 +166,26 @@ impl Generator { } } } + + fn gen_space_secondary_index_process_change_event_batch_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let events_ident = name_generator.get_space_secondary_index_events_ident(); + + let process: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + self.#i.process_change_event_batch(events.#i).await?; + } + }) + .collect(); + + quote! { + async fn process_change_event_batch(&mut self, events: #events_ident) -> eyre::Result<()> { + #(#process)* + core::result::Result::Ok(()) + } + } + } } diff --git a/codegen/src/persist_table/generator/space.rs b/codegen/src/persist_table/generator/space.rs index 6a0bafd6..e2517602 100644 --- a/codegen/src/persist_table/generator/space.rs +++ b/codegen/src/persist_table/generator/space.rs @@ -25,6 +25,7 @@ impl Generator { let ident = name_generator.get_persistence_engine_ident(); let primary_key_type = name_generator.get_primary_key_type_ident(); let inner_const_name = name_generator.get_page_inner_size_const_ident(); + let const_name = name_generator.get_page_size_const_ident(); let space_secondary_indexes = name_generator.get_space_secondary_index_ident(); let space_secondary_indexes_events = name_generator.get_space_secondary_index_events_ident(); @@ -42,7 +43,8 @@ impl Generator { pub type #ident = PersistenceEngine< SpaceData< <<#primary_key_type as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State, - { #inner_const_name as u32 } + { #inner_const_name}, + { #const_name as u32 } >, #space_index_type #space_secondary_indexes, diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index 4f979c44..db61cec4 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -32,11 +32,11 @@ impl Generator { let space_file_ident = name_generator.get_space_file_ident(); let primary_index = if self.attributes.pk_unsized { quote! { - pub primary_index: (Vec>>, Vec>>), + pub primary_index: (Vec>>, Vec>>), } } else { quote! { - pub primary_index: (Vec>>, Vec>>), + pub primary_index: (Vec>>, Vec>>), } }; @@ -260,7 +260,7 @@ impl Generator { let file_length = data_file.metadata().await?.len(); let count = file_length / (#inner_const_name as u64 + GENERAL_HEADER_SIZE as u64); for page_id in 1..=count { - let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32).await?; + let index = parse_data_page::<{ #page_const_name as u32}, { #inner_const_name as usize }>(&mut data_file, page_id as u32).await?; data.push(index); } (data, info) diff --git a/codegen/src/persist_table/generator/space_file/worktable_impls.rs b/codegen/src/persist_table/generator/space_file/worktable_impls.rs index caf43aa7..bd5efd80 100644 --- a/codegen/src/persist_table/generator/space_file/worktable_impls.rs +++ b/codegen/src/persist_table/generator/space_file/worktable_impls.rs @@ -103,7 +103,7 @@ impl Generator { let const_name = name_generator.get_page_inner_size_const_ident(); if self.attributes.pk_unsized { quote! { - pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { + pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { let mut pages = vec![]; for node in self.0.pk_map.iter_nodes() { let page = UnsizedIndexPage::from_node(node.lock_arc().as_ref()); @@ -115,7 +115,7 @@ impl Generator { } } else { quote! { - pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { + pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); let mut pages = vec![]; for node in self.0.pk_map.iter_nodes() { diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index e2d3b84b..1a8c8145 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -125,7 +125,7 @@ impl Generator { let index_field_name = &idx.name; let diff_key = Literal::string(i.to_string().as_str()); - let match_arm = if let Some(t) = self.columns.columns_map.get(&idx.field) { + if let Some(t) = self.columns.columns_map.get(&idx.field) { let type_str = t.to_string(); let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); @@ -156,9 +156,7 @@ impl Generator { } } else { quote! {} - }; - - match_arm + } }); let idents = self .columns diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index 8b048141..b346f071 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -147,7 +147,7 @@ impl Generator { let index_field_name = &idx.name; let diff_key = Literal::string(i.to_string().as_str()); - let match_arm = if let Some(t) = self.columns.columns_map.get(&idx.field) { + if let Some(t) = self.columns.columns_map.get(&idx.field) { let type_str = t.to_string(); let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); @@ -174,9 +174,7 @@ impl Generator { } } else { quote! {} - }; - - match_arm + } }); quote! { diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index a2ae18ed..3d8532d2 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -65,7 +65,7 @@ impl Generator { let delete_logic = self.gen_delete_logic(); quote! { - pub async fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { + pub fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { #delete_logic core::result::Result::Ok(()) } @@ -87,9 +87,10 @@ impl Generator { #pk_ident, #secondary_events_ident > = Operation::Delete(DeleteOperation { - id: Default::default(), + id: uuid::Uuid::now_v7().into(), secondary_keys_events, primary_key_events, + link, }); self.2.apply_operation(op); } diff --git a/codegen/src/worktable/generator/queries/type.rs b/codegen/src/worktable/generator/queries/type.rs index a158e387..0e4822cd 100644 --- a/codegen/src/worktable/generator/queries/type.rs +++ b/codegen/src/worktable/generator/queries/type.rs @@ -51,7 +51,7 @@ impl Generator { if !rows.is_empty() { Ok(quote! { - #[derive(Clone, Debug, derive_more::Display, From, PartialEq)] + #[derive(Clone, Debug, From, PartialEq)] #[non_exhaustive] pub enum #avt_type_ident { #(#rows)* diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 8ef341f7..beadf748 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -62,7 +62,7 @@ impl Generator { } else { quote! { if bytes.len() >= link.length as usize { - self.delete_without_lock(pk.clone()).await?; + self.delete_without_lock(pk.clone())?; self.insert(row)?; lock.unlock(); // Releases locks @@ -94,6 +94,7 @@ impl Generator { let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#row_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); #diff_process #persist_op @@ -243,7 +244,7 @@ impl Generator { if need_to_reinsert { let mut row_old = self.select(pk.clone()).unwrap(); #(#row_updates)* - self.delete_without_lock(pk.clone()).await?; + self.delete_without_lock(pk.clone())?; self.insert(row_old)?; lock.unlock(); // Releases locks @@ -269,7 +270,7 @@ impl Generator { #primary_key_ident, #secondary_events_ident > = Operation::Update(UpdateOperation { - id: Default::default(), + id: op_id, secondary_keys_events, bytes: updated_bytes, link, @@ -406,6 +407,7 @@ impl Generator { .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check #diff_process #persist_op @@ -486,7 +488,7 @@ impl Generator { if need_to_reinsert { let mut row_old = self.select(pk.clone()).unwrap(); #(#row_updates)* - self.delete_without_lock(pk.clone()).await?; + self.delete_without_lock(pk.clone())?; self.insert(row_old)?; let lock = self.0.lock_map.get(&pk).expect("was inserted before and not deleted"); @@ -532,6 +534,7 @@ impl Generator { } let mut links_to_unlock = vec![]; + let op_id = OperationId::Multi(uuid::Uuid::now_v7()); for link in links.into_iter() { let pk = self.0.data.select(link)?.get_primary_key().clone(); let mut bytes = rkyv::to_bytes::(&row) @@ -638,6 +641,7 @@ impl Generator { let lock = std::sync::Arc::new(lock); self.0.lock_map.insert(pk.clone(), lock.clone()); + let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check #diff_process #persist_op diff --git a/codegen/src/worktable/generator/table/impls.rs b/codegen/src/worktable/generator/table/impls.rs index 2c3c6dac..32c6003f 100644 --- a/codegen/src/worktable/generator/table/impls.rs +++ b/codegen/src/worktable/generator/table/impls.rs @@ -1,7 +1,7 @@ use proc_macro2::TokenStream; use quote::quote; -use crate::name_generator::WorktableNameGenerator; +use crate::name_generator::{is_unsized_vec, WorktableNameGenerator}; use crate::worktable::generator::Generator; use crate::worktable::model::GeneratorType; @@ -47,11 +47,33 @@ impl Generator { let const_name = name_generator.get_page_inner_size_const_ident(); if self.is_persist { + let pk_types = &self + .columns + .primary_keys + .iter() + .map(|i| { + self.columns + .columns_map + .get(i) + .expect("should exist as got from definition") + .to_string() + }) + .collect::>(); + let pk_types_unsized = is_unsized_vec(pk_types); + let index_size = if pk_types_unsized { + quote! { + let size = #const_name; + } + } else { + quote! { + let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); + } + }; quote! { pub async fn new(config: PersistenceConfig) -> eyre::Result { let mut inner = WorkTable::default(); inner.table_name = #table_name; - let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); + #index_size inner.pk_map = IndexMap::with_maximum_node_size(size); let table_files_path = format!("{}/{}", config.tables_path, #dir_name); let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path).await?; @@ -222,9 +244,9 @@ impl Generator { fn gen_table_count_fn(&self) -> TokenStream { quote! { - pub fn count(&self) -> Option { + pub fn count(&self) -> usize { let count = self.0.pk_map.len(); - (count > 0).then_some(count) + count } } } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 982bc695..8c9cef80 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -5,15 +5,15 @@ edition = "2021" [dependencies] -worktable = { path = "..", version = "0.5.3", features = ["perf_measurements"] } -rkyv = { version = "0.8.9", features = ["uuid-1"] } -lockfree = "0.5.1" -derive_more = { version = "1.0.0", features = ["full"] } -eyre = "0.6.12" -futures = "0.3.30" -async-std = "1.10" -either = "1.15.0" -ordered-float = "5.0.0" -indexset = { version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } -tokio = { version = "1", features = ["full"] } +#worktable = { path = "..", version = "0.5.3", features = ["perf_measurements"] } +#rkyv = { version = "0.8.9", features = ["uuid-1"] } +#lockfree = "0.5.1" +#derive_more = { version = "1.0.0", features = ["full"] } +#eyre = "0.6.12" +#futures = "0.3.30" +#async-std = "1.10" +#either = "1.15.0" +#ordered-float = "5.0.0" +#indexset = { version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } +#tokio = { version = "1", features = ["full"] } diff --git a/examples/src/main.rs b/examples/src/main.rs index b5726594..15514a6a 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -1,105 +1,107 @@ -use futures::executor::block_on; -use worktable::prelude::*; -use worktable::worktable; - -#[tokio::main] -async fn main() { - // describe WorkTable - worktable!( - name: My, - persist: true, - columns: { - id: u64 primary_key autoincrement, - val: i64, - test: i32, - attr: String, - attr2: i32, - attr_float: f64, - attr_string: String, - - }, - indexes: { - idx1: attr, - idx2: attr2 unique, - idx3: attr_string, - }, - queries: { - update: { - ValById(val) by id, - AllAttrById(attr, attr2) by id, - UpdateOptionalById(test) by id, - }, - delete: { - ByAttr() by attr, - ById() by id, - } - } - ); - - // Init Worktable - let config = PersistenceConfig::new("data", "data"); - let my_table = MyWorkTable::new(config).await.unwrap(); - - // WT rows (has prefix My because of table name) - let row = MyRow { - val: 777, - attr: "Attribute0".to_string(), - attr2: 345, - test: 1, - id: 0, - attr_float: 100.0, - attr_string: "String_attr0".to_string(), - }; - - for i in 2..1000000_i64 { - let row = MyRow { - val: 777, - attr: format!("Attribute{}", i), - attr2: 345 + i as i32, - test: i as i32, - id: i as u64, - attr_float: 100.0 + i as f64, - attr_string: format!("String_attr{}", i), - }; - - my_table.insert(row).unwrap(); - } - - // insert - let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); - - // Select ALL records from WT - let _select_all = my_table.select_all().execute(); - //println!("Select All {:?}", select_all); - - // Select All records with attribute TEST - let _select_all = my_table.select_all().execute(); - //println!("Select All {:?}", select_all); - - // Select by Idx - //let _select_by_attr = my_table - // .select_by_attr("Attribute1".to_string()) - // .execute() - //r .unwrap(); - - //for row in select_by_attr { - // println!("Select by idx, row {:?}", row); - //} - - // Update Value query - let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); - let _ = block_on(update); - - let _select_all = my_table.select_all().execute(); - //println!("Select after update val {:?}", select_all); - - let delete = my_table.delete(pk); - let _ = block_on(delete); - - let _select_all = my_table.select_all().execute(); - //println!("Select after delete {:?}", select_all); - - let info = my_table.system_info(); - - println!("{info}"); -} +// use futures::executor::block_on; +// use worktable::prelude::*; +// use worktable::worktable; +// +// #[tokio::main] +// async fn main() { +// // describe WorkTable +// worktable!( +// name: My, +// persist: true, +// columns: { +// id: u64 primary_key autoincrement, +// val: i64, +// test: i32, +// attr: String, +// attr2: i32, +// attr_float: f64, +// attr_string: String, +// +// }, +// indexes: { +// idx1: attr, +// idx2: attr2 unique, +// idx3: attr_string, +// }, +// queries: { +// update: { +// ValById(val) by id, +// AllAttrById(attr, attr2) by id, +// UpdateOptionalById(test) by id, +// }, +// delete: { +// ByAttr() by attr, +// ById() by id, +// } +// } +// ); +// +// // Init Worktable +// let config = PersistenceConfig::new("data", "data"); +// let my_table = MyWorkTable::new(config).await.unwrap(); +// +// // WT rows (has prefix My because of table name) +// let row = MyRow { +// val: 777, +// attr: "Attribute0".to_string(), +// attr2: 345, +// test: 1, +// id: 0, +// attr_float: 100.0, +// attr_string: "String_attr0".to_string(), +// }; +// +// for i in 2..1000000_i64 { +// let row = MyRow { +// val: 777, +// attr: format!("Attribute{}", i), +// attr2: 345 + i as i32, +// test: i as i32, +// id: i as u64, +// attr_float: 100.0 + i as f64, +// attr_string: format!("String_attr{}", i), +// }; +// +// my_table.insert(row).unwrap(); +// } +// +// // insert +// let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); +// +// // Select ALL records from WT +// let _select_all = my_table.select_all().execute(); +// //println!("Select All {:?}", select_all); +// +// // Select All records with attribute TEST +// let _select_all = my_table.select_all().execute(); +// //println!("Select All {:?}", select_all); +// +// // Select by Idx +// //let _select_by_attr = my_table +// // .select_by_attr("Attribute1".to_string()) +// // .execute() +// //r .unwrap(); +// +// //for row in select_by_attr { +// // println!("Select by idx, row {:?}", row); +// //} +// +// // Update Value query +// let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); +// let _ = block_on(update); +// +// let _select_all = my_table.select_all().execute(); +// //println!("Select after update val {:?}", select_all); +// +// let delete = my_table.delete(pk); +// let _ = block_on(delete); +// +// let _select_all = my_table.select_all().execute(); +// //println!("Select after delete {:?}", select_all); +// +// let info = my_table.system_info(); +// +// println!("{info}"); +// } + +fn main() {} diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index d751375c..614f1839 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -344,10 +344,7 @@ mod tests { let link = shared.save_row(&row); links.push(link) } - let other_links = rx.recv().unwrap(); - - print!("{:?}", other_links); - print!("{:?}", links); + let _other_links = rx.recv().unwrap(); } #[test] diff --git a/src/index/mod.rs b/src/index/mod.rs index f740596b..4026b05c 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,6 +1,7 @@ mod table_index; mod table_index_cdc; mod table_secondary_index; +mod table_secondary_index_events; mod unsized_node; pub use indexset::concurrent::map::BTreeMap as IndexMap; @@ -8,6 +9,7 @@ pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap; pub use table_index::TableIndex; pub use table_index_cdc::TableIndexCdc; pub use table_secondary_index::{IndexError, TableSecondaryIndex, TableSecondaryIndexCdc}; +pub use table_secondary_index_events::TableSecondaryIndexEventsOps; pub use unsized_node::UnsizedNode; #[derive(Debug)] diff --git a/src/index/table_secondary_index_events.rs b/src/index/table_secondary_index_events.rs new file mode 100644 index 00000000..c4c4c72a --- /dev/null +++ b/src/index/table_secondary_index_events.rs @@ -0,0 +1,5 @@ +pub trait TableSecondaryIndexEventsOps { + fn extend(&mut self, another: Self) + where + Self: Sized; +} diff --git a/src/index/unsized_node.rs b/src/index/unsized_node.rs index 1a63133b..0ae00242 100644 --- a/src/index/unsized_node.rs +++ b/src/index/unsized_node.rs @@ -6,7 +6,7 @@ use std::collections::Bound; use std::ops::Deref; use std::slice::Iter; -pub const UNSIZED_HEADER_LENGTH: u32 = 16; +pub const UNSIZED_HEADER_LENGTH: u32 = 64; #[derive(Debug)] pub struct UnsizedNode @@ -198,7 +198,7 @@ mod test { #[test] fn test_split_basic() { - let mut node = UnsizedNode::::with_capacity(184); + let mut node = UnsizedNode::::with_capacity(232); for i in 0..10 { let s = format!("{}_______", i); node.insert(s); @@ -207,20 +207,20 @@ mod test { let split = node.halve(); assert_eq!(node.inner.len(), split.inner.len()); assert_eq!(node.length, split.length); - assert_eq!(node.length, 104) + assert_eq!(node.length, 152) } #[test] fn test_split() { - let mut node = UnsizedNode::::with_capacity(152); + let mut node = UnsizedNode::::with_capacity(200); node.insert(String::from_utf8(vec![b'1'; 16]).unwrap()); node.insert(String::from_utf8(vec![b'2'; 16]).unwrap()); node.insert(String::from_utf8(vec![b'3'; 24]).unwrap()); assert_eq!(node.length, node.length_capacity); let split = node.halve(); - assert_eq!(node.length, 104); + assert_eq!(node.length, 152); assert_eq!(node.inner.len(), 2); - assert_eq!(split.length, 88); + assert_eq!(split.length, 136); assert_eq!(split.inner.len(), 1); } } diff --git a/src/lib.rs b/src/lib.rs index a5259c55..1bf3f2a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,9 +26,10 @@ pub mod prelude { pub use crate::mem_stat::MemStat; pub use crate::persistence::{ map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general, - DeleteOperation, IndexTableOfContents, InsertOperation, Operation, PersistenceConfig, - PersistenceEngine, PersistenceEngineOps, PersistenceTask, SpaceData, SpaceDataOps, - SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, SpaceSecondaryIndexOps, UpdateOperation, + DeleteOperation, IndexTableOfContents, InsertOperation, Operation, OperationId, + PersistenceConfig, PersistenceEngine, PersistenceEngineOps, PersistenceTask, SpaceData, + SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, SpaceSecondaryIndexOps, + UpdateOperation, }; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{Order, QueryParams, SelectQueryBuilder, SelectQueryExecutor}; @@ -36,8 +37,8 @@ pub mod prelude { pub use crate::util::{OrderedF32Def, OrderedF64Def}; pub use crate::{ lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, TableIndex, TableIndexCdc, - TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, UnsizedNode, WorkTable, - WorkTableError, + TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, + UnsizedNode, WorkTable, WorkTableError, }; pub use data_bucket::{ align, get_index_page_size_from_data_length, map_data_pages_to_general, parse_data_page, diff --git a/src/mem_stat/mod.rs b/src/mem_stat/mod.rs index 5c322c65..1ea05599 100644 --- a/src/mem_stat/mod.rs +++ b/src/mem_stat/mod.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; +use data_bucket::page::PageId; use data_bucket::Link; use indexset::core::multipair::MultiPair; use indexset::core::node::NodeLike; @@ -11,6 +12,8 @@ use indexset::core::pair::Pair; use ordered_float::OrderedFloat; use uuid::Uuid; +use crate::persistence::OperationType; +use crate::prelude::OperationId; use crate::IndexMultiMap; use crate::{impl_memstat_zero, IndexMap}; @@ -174,4 +177,4 @@ where } } -impl_memstat_zero!(Link, Uuid); +impl_memstat_zero!(Link, PageId, Uuid, OperationId, OperationType); diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index 780125ba..110fd287 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -1,12 +1,16 @@ -use std::fs; -use std::marker::PhantomData; -use std::path::Path; - -use crate::persistence::operation::Operation; +use crate::persistence::operation::{BatchOperation, Operation}; use crate::persistence::{ PersistenceEngineOps, SpaceDataOps, SpaceIndexOps, SpaceSecondaryIndexOps, }; use crate::prelude::{PrimaryKeyGeneratorState, TablePrimaryKey}; +use crate::TableSecondaryIndexEventsOps; +use futures::future::Either; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use std::fmt::Debug; +use std::fs; +use std::marker::PhantomData; +use std::path::Path; #[derive(Debug)] pub struct PersistenceEngine< @@ -84,13 +88,13 @@ impl< PrimaryKeyGenState, > where - PrimaryKey: Ord + TablePrimaryKey + Send, + PrimaryKey: Clone + Debug + Ord + TablePrimaryKey + Send, ::Generator: PrimaryKeyGeneratorState, SpaceData: SpaceDataOps + Send, SpacePrimaryIndex: SpaceIndexOps + Send, SpaceSecondaryIndexes: SpaceSecondaryIndexOps + Send, - SecondaryIndexEvents: Send, - PrimaryKeyGenState: Send, + SecondaryIndexEvents: Clone + Debug + Default + TableSecondaryIndexEventsOps + Send, + PrimaryKeyGenState: Clone + Debug + Send, { async fn apply_operation( &mut self, @@ -129,4 +133,43 @@ where } } } + + async fn apply_batch_operation( + &mut self, + batch_op: BatchOperation, + ) -> eyre::Result<()> { + let batch_data_op = batch_op.get_batch_data_op()?; + + let (pk_evs, secondary_evs) = batch_op.get_indexes_evs()?; + // self.data.save_batch_data(batch_data_op).await?; + // self.primary_index + // .process_change_event_batch(pk_evs) + // .await?; + // self.secondary_indexes + // .process_change_event_batch(secondary_evs) + // .await?; + { + let mut futs = FuturesUnordered::new(); + futs.push(Either::Left(Either::Right( + self.data.save_batch_data(batch_data_op), + ))); + futs.push(Either::Left(Either::Left( + self.primary_index.process_change_event_batch(pk_evs), + ))); + futs.push(Either::Right( + self.secondary_indexes + .process_change_event_batch(secondary_evs), + )); + + while (futs.next().await).is_some() {} + } + + if let Some(pk_gen_state_update) = batch_op.get_pk_gen_state()? { + let info = self.data.get_mut_info(); + info.inner.pk_gen_state = pk_gen_state_update; + self.data.save_info().await?; + } + + Ok(()) + } } diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index b505571a..e3bf7401 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -4,9 +4,12 @@ mod operation; mod space; mod task; +use crate::persistence::operation::BatchOperation; pub use engine::PersistenceEngine; pub use manager::PersistenceConfig; -pub use operation::{DeleteOperation, InsertOperation, Operation, UpdateOperation}; +pub use operation::{ + DeleteOperation, InsertOperation, Operation, OperationId, OperationType, UpdateOperation, +}; pub use space::{ map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general, IndexTableOfContents, SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, @@ -20,4 +23,9 @@ pub trait PersistenceEngineOps, ) -> impl Future> + Send; + + fn apply_batch_operation( + &mut self, + batch_op: BatchOperation, + ) -> impl Future> + Send; } diff --git a/src/persistence/operation.rs b/src/persistence/operation.rs index 12f968fb..3a9f0731 100644 --- a/src/persistence/operation.rs +++ b/src/persistence/operation.rs @@ -1,10 +1,18 @@ -use data_bucket::Link; +use std::collections::HashMap; +use std::fmt::Debug; + +use crate::persistence::space::{BatchChangeEvent, BatchData}; +use crate::persistence::task::QueueInnerRow; +use crate::prelude::*; +use crate::prelude::{From, Order, SelectQueryExecutor}; +use data_bucket::page::PageId; +use data_bucket::{Link, SizeMeasurable}; use derive_more::Display; use indexset::cdc::change::ChangeEvent; use indexset::core::pair::Pair; use rkyv::{Archive, Deserialize, Serialize}; - -use crate::prelude::From; +use uuid::Uuid; +use worktable_codegen::{worktable, MemStat}; /// Represents page's identifier. Is unique within the table bounds #[derive( @@ -13,7 +21,6 @@ use crate::prelude::From; Clone, Deserialize, Debug, - Default, Display, Eq, From, @@ -23,16 +30,249 @@ use crate::prelude::From; PartialOrd, Serialize, )] -pub struct OperationId(u32); +#[rkyv(derive(Debug, PartialOrd, PartialEq, Eq, Ord))] +pub enum OperationId { + #[from] + Single(Uuid), + Multi(Uuid), +} + +impl SizeMeasurable for OperationId { + fn aligned_size(&self) -> usize { + Uuid::default().aligned_size() + } +} + +impl Default for OperationId { + fn default() -> Self { + OperationId::Single(Uuid::now_v7()) + } +} + +#[derive( + Archive, + Clone, + Copy, + Debug, + Default, + Deserialize, + Serialize, + Eq, + Ord, + PartialEq, + PartialOrd, + Hash, +)] +#[rkyv(compare(PartialEq), derive(Debug))] +#[repr(u8)] +pub enum OperationType { + #[default] + Insert, + Update, + Delete, +} + +impl SizeMeasurable for OperationType { + fn aligned_size(&self) -> usize { + u8::default().aligned_size() + } +} + +worktable! ( + name: BatchInner, + columns: { + id: u64 primary_key autoincrement, + operation_id: OperationId, + page_id: PageId, + link: Link, + op_type: OperationType, + pos: usize, + }, + indexes: { + operation_id_idx: operation_id, + page_id_idx: page_id, + link_idx: link, + op_type_idx: op_type, + }, + queries: { + update: { + PosByOpId(pos) by operation_id, + }, + } +); + +impl BatchInnerWorkTable { + pub fn iter_links(&self) -> impl Iterator { + self.0 + .indexes + .link_idx + .iter() + .map(|(l, _)| *l) + .collect::>() + .into_iter() + } +} + +impl From for BatchInnerRow { + fn from(value: QueueInnerRow) -> Self { + BatchInnerRow { + id: value.id, + operation_id: value.operation_id, + page_id: value.page_id, + link: value.link, + op_type: Default::default(), + pos: 0, + } + } +} #[derive(Debug)] +pub struct BatchOperation { + pub ops: Vec>, + pub info_wt: BatchInnerWorkTable, +} + +impl + BatchOperation +where + PrimaryKeyGenState: Debug + Clone, + PrimaryKey: Debug + Clone, + SecondaryKeys: Debug + Default + Clone + TableSecondaryIndexEventsOps, +{ + pub fn get_pk_gen_state(&self) -> eyre::Result> { + let row = self + .info_wt + .select_by_op_type(OperationType::Insert) + .order_on(BatchInnerRowFields::OperationId, Order::Desc) + .limit(1) + .execute()?; + Ok(row.into_iter().next().map(|r| { + let pos = r.pos; + let op = self.ops.get(pos).expect("available as pos in wt"); + op.pk_gen_state().expect("is insert operation").clone() + })) + } + + pub fn get_indexes_evs(&self) -> eyre::Result<(BatchChangeEvent, SecondaryKeys)> { + let mut primary = vec![]; + let mut secondary = SecondaryKeys::default(); + + let mut rows = self.info_wt.select_all().execute()?; + rows.sort_by(|l, r| l.operation_id.cmp(&r.operation_id)); + for row in rows { + let pos = row.pos; + let op = self + .ops + .get(pos) + .expect("pos should be correct as was set while batch build"); + if let Some(evs) = op.primary_key_events() { + primary.extend(evs.iter().cloned()) + } + let secondary_new = op.secondary_key_events(); + secondary.extend(secondary_new.clone()); + } + + Ok((primary, secondary)) + } + + pub fn get_batch_data_op(&self) -> eyre::Result { + let mut data = HashMap::new(); + for link in self.info_wt.iter_links() { + let last_op = self + .info_wt + .select_by_link(link) + .order_on(BatchInnerRowFields::OperationId, Order::Desc) + .limit(1) + .execute()?; + let op_row = last_op + .into_iter() + .next() + .expect("if link is in info_wt at least one row exists"); + let pos = op_row.pos; + let op = self + .ops + .get(pos) + .expect("pos should be correct as was set while batch build"); + if let Some(data_bytes) = op.bytes() { + let link = op.link(); + data.entry(link.page_id) + .and_modify(|v: &mut Vec<_>| v.push((link, data_bytes.to_vec()))) + .or_insert(vec![(link, data_bytes.to_vec())]); + } + } + + Ok(data) + } +} + +#[derive(Clone, Debug)] pub enum Operation { Insert(InsertOperation), Update(UpdateOperation), Delete(DeleteOperation), } -#[derive(Debug)] +impl + Operation +{ + pub fn operation_type(&self) -> OperationType { + match &self { + Operation::Insert(_) => OperationType::Insert, + Operation::Update(_) => OperationType::Update, + Operation::Delete(_) => OperationType::Delete, + } + } + + pub fn operation_id(&self) -> OperationId { + match &self { + Operation::Insert(insert) => insert.id, + Operation::Update(update) => update.id, + Operation::Delete(delete) => delete.id, + } + } + + pub fn link(&self) -> Link { + match &self { + Operation::Insert(insert) => insert.link, + Operation::Update(update) => update.link, + Operation::Delete(delete) => delete.link, + } + } + + pub fn bytes(&self) -> Option<&[u8]> { + match &self { + Operation::Insert(insert) => Some(&insert.bytes), + Operation::Update(update) => Some(&update.bytes), + Operation::Delete(_) => None, + } + } + + pub fn primary_key_events(&self) -> Option<&Vec>>> { + match &self { + Operation::Insert(insert) => Some(&insert.primary_key_events), + Operation::Update(_) => None, + Operation::Delete(delete) => Some(&delete.primary_key_events), + } + } + + pub fn secondary_key_events(&self) -> &SecondaryKeys { + match &self { + Operation::Insert(insert) => &insert.secondary_keys_events, + Operation::Update(update) => &update.secondary_keys_events, + Operation::Delete(delete) => &delete.secondary_keys_events, + } + } + + pub fn pk_gen_state(&self) -> Option<&PrimaryKeyGenState> { + match &self { + Operation::Insert(insert) => Some(&insert.pk_gen_state), + Operation::Update(_) => None, + Operation::Delete(_) => None, + } + } +} + +#[derive(Clone, Debug)] pub struct InsertOperation { pub id: OperationId, pub primary_key_events: Vec>>, @@ -42,7 +282,7 @@ pub struct InsertOperation { pub link: Link, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct UpdateOperation { pub id: OperationId, pub secondary_keys_events: SecondaryKeys, @@ -50,9 +290,10 @@ pub struct UpdateOperation { pub link: Link, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct DeleteOperation { pub id: OperationId, pub primary_key_events: Vec>>, pub secondary_keys_events: SecondaryKeys, + pub link: Link, } diff --git a/src/persistence/space/data.rs b/src/persistence/space/data.rs index ba84e713..f74a41f5 100644 --- a/src/persistence/space/data.rs +++ b/src/persistence/space/data.rs @@ -2,13 +2,14 @@ use std::future::Future; use std::io::SeekFrom; use std::path::Path; -use crate::persistence::space::open_or_create_file; +use crate::persistence::space::{open_or_create_file, BatchData}; use crate::persistence::SpaceDataOps; use crate::prelude::WT_DATA_EXTENSION; use convert_case::{Case, Casing}; use data_bucket::{ - parse_general_header_by_index, parse_page, persist_page, update_at, DataPage, GeneralHeader, - GeneralPage, Link, PageType, Persistable, SizeMeasurable, SpaceInfoPage, GENERAL_HEADER_SIZE, + parse_data_pages_batch, parse_general_header_by_index, parse_page, persist_page, + persist_pages_batch, update_at, DataPage, GeneralHeader, GeneralPage, Link, PageType, + Persistable, SizeMeasurable, SpaceInfoPage, }; use rkyv::api::high::HighDeserializer; use rkyv::rancor::Strategy; @@ -21,19 +22,21 @@ use tokio::fs::File; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; #[derive(Debug)] -pub struct SpaceData { +pub struct SpaceData { pub info: GeneralPage>, pub last_page_id: u32, pub current_data_length: u32, pub data_file: File, } -impl SpaceData { +impl + SpaceData +{ async fn update_data_length(&mut self) -> eyre::Result<()> { let offset = (u32::default().aligned_size() * 6) as u32; self.data_file .seek(SeekFrom::Start( - (self.last_page_id * (DATA_LENGTH + GENERAL_HEADER_SIZE as u32) + offset) as u64, + (self.last_page_id * PAGE_SIZE + offset) as u64, )) .await?; let bytes = rkyv::to_bytes::(&self.current_data_length)?; @@ -42,8 +45,8 @@ impl SpaceData { } } -impl SpaceDataOps - for SpaceData +impl SpaceDataOps + for SpaceData where PkGenState: Default + for<'a> Serialize< @@ -72,9 +75,9 @@ where } else { open_or_create_file(path).await? }; - let info = parse_page::<_, DATA_LENGTH>(&mut data_file, 0).await?; + let info = parse_page::<_, PAGE_SIZE>(&mut data_file, 0).await?; let file_length = data_file.metadata().await?.len(); - let page_id = file_length / (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64); + let page_id = file_length / PAGE_SIZE as u64; let last_page_header = parse_general_header_by_index(&mut data_file, page_id as u32).await?; @@ -107,7 +110,7 @@ where async fn save_data(&mut self, link: Link, bytes: &[u8]) -> eyre::Result<()> { if link.page_id > self.last_page_id.into() { let mut page = GeneralPage { - header: GeneralHeader::new(link.page_id, PageType::SpaceInfo, 0.into()), + header: GeneralHeader::new(link.page_id, PageType::Data, 0.into()), inner: DataPage { length: 0, data: [0; 1], @@ -119,7 +122,57 @@ where } self.current_data_length += link.length; self.update_data_length().await?; - update_at::<{ DATA_LENGTH }>(&mut self.data_file, link, bytes).await + update_at::<{ PAGE_SIZE }>(&mut self.data_file, link, bytes).await + } + + async fn save_batch_data(&mut self, batch_data: BatchData) -> eyre::Result<()> { + let page_ids = batch_data.keys().map(|id| (*id).into()).collect::>(); + let ids_to_create = page_ids + .iter() + .filter(|id| **id > self.last_page_id) + .cloned() + .collect::>(); + let ids_to_parse = page_ids + .iter() + .filter(|id| **id <= self.last_page_id) + .cloned() + .collect::>(); + + if let Some(max) = ids_to_create.last() { + self.last_page_id = *max; + } + let created_pages = ids_to_create + .into_iter() + .map(|id| GeneralPage { + header: GeneralHeader::new(id.into(), PageType::Data, 0.into()), + inner: DataPage { + length: 0, + data: [0; INNER_PAGE_SIZE], + }, + }) + .collect::>(); + let parsed_pages = + parse_data_pages_batch::(&mut self.data_file, ids_to_parse) + .await?; + + let updated_pages = vec![parsed_pages, created_pages] + .into_iter() + .flatten() + .map(|mut page| { + let id = page.header.page_id; + let ops = batch_data + .get(&id) + .expect("should be available as pages parsed from these ids"); + for (link, bytes) in ops { + page.inner.update_at(*link, bytes)?; + } + Ok::<_, eyre::Report>(page) + }) + .collect::, _>>()?; + + persist_pages_batch(updated_pages, &mut self.data_file).await?; + + Ok(()) } fn get_mut_info(&mut self) -> &mut GeneralPage> { diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index 76fd98e4..60bbc2ae 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -2,7 +2,9 @@ mod table_of_contents; mod unsized_; mod util; +use std::collections::HashMap; use std::fmt::Debug; +use std::hash::Hash; use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -10,9 +12,9 @@ use std::sync::Arc; use convert_case::{Case, Casing}; use data_bucket::page::{IndexValue, PageId}; use data_bucket::{ - get_index_page_size_from_data_length, parse_page, persist_page, GeneralHeader, GeneralPage, - IndexPage, IndexPageUtility, Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, - GENERAL_HEADER_SIZE, + get_index_page_size_from_data_length, parse_page, persist_page, persist_pages_batch, + GeneralHeader, GeneralPage, IndexPage, IndexPageUtility, Link, PageType, SizeMeasurable, + SpaceId, SpaceInfoPage, GENERAL_HEADER_SIZE, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -27,7 +29,7 @@ use rkyv::util::AlignedVec; use rkyv::{rancor, Archive, Deserialize, Serialize}; use tokio::fs::File; -use crate::persistence::space::open_or_create_file; +use crate::persistence::space::{open_or_create_file, BatchChangeEvent}; use crate::persistence::SpaceIndexOps; use crate::prelude::WT_INDEX_EXTENSION; @@ -36,20 +38,21 @@ pub use unsized_::SpaceIndexUnsized; pub use util::{map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general}; #[derive(Debug)] -pub struct SpaceIndex { +pub struct SpaceIndex { space_id: SpaceId, - table_of_contents: IndexTableOfContents, + table_of_contents: IndexTableOfContents<(T, Link), INNER_PAGE_SIZE>, next_page_id: Arc, index_file: File, #[allow(dead_code)] info: GeneralPage>, } -impl SpaceIndex +impl SpaceIndex where T: Archive + Ord + Eq + + Hash + Clone + Default + Debug @@ -58,7 +61,7 @@ where + Send + Sync + 'static, - ::Archived: Deserialize> + Ord + Eq, + ::Archived: Deserialize> + Ord + Eq + Debug, { pub async fn new>(index_file_path: S, space_id: SpaceId) -> eyre::Result { let mut index_file = if !Path::new(index_file_path.as_ref()).exists() { @@ -79,13 +82,13 @@ where } else { open_or_create_file(index_file_path).await? }; - let info = parse_page::<_, DATA_LENGTH>(&mut index_file, 0).await?; + let info = parse_page::<_, INNER_PAGE_SIZE>(&mut index_file, 0).await?; let file_length = index_file.metadata().await?.len(); - let page_id = if file_length % (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64) == 0 { - file_length / (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64) + let page_id = if file_length % (INNER_PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64) == 0 { + file_length / (INNER_PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64) } else { - file_length / (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64) + 1 + file_length / (INNER_PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64) + 1 }; let next_page_id = Arc::new(AtomicU32::new(page_id as u32)); let table_of_contents = @@ -105,8 +108,8 @@ where node_id: Pair, page_id: PageId, ) -> eyre::Result<()> { - let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); - let mut page = IndexPage::new(node_id.key.clone(), size); + let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); + let mut page = IndexPage::new(node_id.clone().into(), size); page.current_index = 1; page.current_length = 1; page.slots[0] = 0; @@ -130,13 +133,13 @@ where async fn insert_on_index_page( &mut self, page_id: PageId, - node_id: T, + node_id: Pair, index: usize, value: Pair, - ) -> eyre::Result> { + ) -> eyre::Result>> { let mut new_node_id = None; - let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); + let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); let mut utility = IndexPage::::parse_index_page_utility(&mut self.index_file, page_id).await?; utility.slots.insert(index, utility.current_index); @@ -155,9 +158,9 @@ where ) .await?; - if node_id < value.key { - utility.node_id = value.key.clone(); - new_node_id = Some(value.key); + if node_id.key < value.key { + utility.node_id = value.clone().into(); + new_node_id = Some(value); } IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility).await?; @@ -168,26 +171,29 @@ where async fn remove_from_index_page( &mut self, page_id: PageId, - node_id: T, + node_id: Pair, index: usize, value: Pair, - ) -> eyre::Result> { + ) -> eyre::Result>> { let mut new_node_id = None; - let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); + let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); let mut utility = IndexPage::::parse_index_page_utility(&mut self.index_file, page_id).await?; - utility.current_index = *utility + let value_position = *utility .slots .get(index) .expect("Slots should exist for every index within `size`"); + if value_position < utility.current_index { + utility.current_index = value_position; + } utility.slots.remove(index); utility.slots.push(0); utility.current_length -= 1; IndexPage::::remove_value(&mut self.index_file, page_id, size, utility.current_index) .await?; - if node_id == value.key { + if node_id.key == value.key { let index = *utility .slots .get(index - 1) @@ -198,9 +204,8 @@ where size, index as usize, ) - .await? - .key; - new_node_id = Some(utility.node_id.clone()) + .await?; + new_node_id = Some(utility.node_id.clone().into()) } IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility).await?; @@ -210,19 +215,22 @@ where async fn process_insert_at( &mut self, - node_id: T, + node_id: Pair, value: Pair, index: usize, ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; if let Some(new_node_id) = self .insert_on_index_page(page_id, node_id.clone(), index, value) .await? { - self.table_of_contents.update_key(&node_id, new_node_id); + self.table_of_contents.update_key( + &(node_id.key, node_id.value), + (new_node_id.key, new_node_id.value), + ); self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) @@ -230,19 +238,22 @@ where async fn process_remove_at( &mut self, - node_id: T, + node_id: Pair, value: Pair, index: usize, ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; if let Some(new_node_id) = self .remove_from_index_page(page_id, node_id.clone(), index, value) .await? { - self.table_of_contents.update_key(&node_id, new_node_id); + self.table_of_contents.update_key( + &(node_id.key, node_id.value), + (new_node_id.key, new_node_id.value), + ); self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) @@ -253,26 +264,32 @@ where } else { self.next_page_id.fetch_add(1, Ordering::Relaxed).into() }; - self.table_of_contents.insert(node_id.key.clone(), page_id); + self.table_of_contents + .insert((node_id.key.clone(), node_id.value), page_id); self.table_of_contents.persist(&mut self.index_file).await?; self.add_new_index_page(node_id, page_id).await?; Ok(()) } - async fn process_remove_node(&mut self, node_id: T) -> eyre::Result<()> { - self.table_of_contents.remove(&node_id); + async fn process_remove_node(&mut self, node_id: Pair) -> eyre::Result<()> { + self.table_of_contents.remove(&(node_id.key, node_id.value)); self.table_of_contents.persist(&mut self.index_file).await?; Ok(()) } - async fn process_split_node(&mut self, node_id: T, split_index: usize) -> eyre::Result<()> { + async fn process_split_node( + &mut self, + node_id: Pair, + split_index: usize, + ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; let mut page = - parse_page::, DATA_LENGTH>(&mut self.index_file, page_id.into()).await?; + parse_page::, INNER_PAGE_SIZE>(&mut self.index_file, page_id.into()) + .await?; let splitted_page = page.inner.split(split_index); let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { id @@ -280,10 +297,17 @@ where self.next_page_id.fetch_add(1, Ordering::Relaxed).into() }; - self.table_of_contents - .update_key(&node_id, page.inner.node_id.clone()); - self.table_of_contents - .insert(splitted_page.node_id.clone(), new_page_id); + self.table_of_contents.update_key( + &(node_id.key.clone(), node_id.value), + (page.inner.node_id.key.clone(), page.inner.node_id.link), + ); + self.table_of_contents.insert( + ( + splitted_page.node_id.key.clone(), + splitted_page.node_id.link, + ), + new_page_id, + ); self.table_of_contents.persist(&mut self.index_file).await?; self.add_index_page(splitted_page, new_page_id).await?; @@ -293,12 +317,14 @@ where } pub async fn parse_indexset(&mut self) -> eyre::Result> { - let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); + let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); let indexset = BTreeMap::::with_maximum_node_size(size); for (_, page_id) in self.table_of_contents.iter() { - let page = - parse_page::, DATA_LENGTH>(&mut self.index_file, (*page_id).into()) - .await?; + let page = parse_page::, INNER_PAGE_SIZE>( + &mut self.index_file, + (*page_id).into(), + ) + .await?; let node = page.inner.get_node(); indexset.attach_node(node) } @@ -307,11 +333,12 @@ where } } -impl SpaceIndexOps for SpaceIndex +impl SpaceIndexOps for SpaceIndex where T: Archive + Ord + Eq + + Hash + Clone + Default + Debug @@ -320,7 +347,7 @@ where + Send + Sync + 'static, - ::Archived: Deserialize> + Ord + Eq, + ::Archived: Deserialize> + Ord + Eq + Debug, { async fn primary_from_table_files_path + Send>( table_path: S, @@ -372,36 +399,177 @@ where max_value: node_id, value, index, - } => self.process_insert_at(node_id.key, value, index).await, + } => self.process_insert_at(node_id, value, index).await, ChangeEvent::RemoveAt { max_value: node_id, value, index, - } => self.process_remove_at(node_id.key, value, index).await, + } => self.process_remove_at(node_id, value, index).await, ChangeEvent::CreateNode { max_value: node_id } => { self.process_create_node(node_id).await } ChangeEvent::RemoveNode { max_value: node_id } => { - self.process_remove_node(node_id.key).await + self.process_remove_node(node_id).await } ChangeEvent::SplitNode { max_value: node_id, split_index, - } => self.process_split_node(node_id.key, split_index).await, + } => self.process_split_node(node_id, split_index).await, } } + + async fn process_change_event_batch( + &mut self, + events: BatchChangeEvent, + ) -> eyre::Result<()> { + let mut pages: HashMap = HashMap::new(); + for ev in events { + match &ev { + ChangeEvent::InsertAt { max_value, .. } + | ChangeEvent::RemoveAt { max_value, .. } => { + let page_id = &(max_value.key.clone(), max_value.value); + // println!("{:?}", page_id); + // println!("{:?}", self.table_of_contents.iter().collect::>()); + let page_index = self + .table_of_contents + .get(page_id) + .expect("page should be available in table of contents"); + let page = pages.get_mut(&page_index); + let page_to_update = if let Some(page) = page { + page + } else { + let page = parse_page::, INNER_PAGE_SIZE>( + &mut self.index_file, + page_index.into(), + ) + .await?; + pages.insert(page_index, page); + pages + .get_mut(&page_index) + .expect("should be available as was just inserted before") + }; + page_to_update.inner.apply_change_event(ev.clone())?; + if &( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ) != page_id + { + self.table_of_contents.update_key( + page_id, + ( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ), + ); + } + } + ChangeEvent::CreateNode { max_value } => { + let page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { + id + } else { + self.next_page_id.fetch_add(1, Ordering::Relaxed).into() + }; + self.table_of_contents + .insert((max_value.key.clone(), max_value.value), page_id); + + let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); + let mut page = IndexPage::new(max_value.clone().into(), size); + let ev = ChangeEvent::InsertAt { + max_value: max_value.clone(), + value: max_value.clone(), + index: 0, + }; + page.apply_change_event(ev)?; + let header = GeneralHeader::new(page_id, PageType::Index, self.space_id); + let general_page = GeneralPage { + inner: page, + header, + }; + pages.insert(page_id, general_page); + self.table_of_contents + .insert((max_value.key.clone(), max_value.value), page_id) + } + ChangeEvent::RemoveNode { max_value } => { + self.table_of_contents + .remove(&(max_value.key.clone(), max_value.value)); + } + ChangeEvent::SplitNode { + max_value, + split_index, + } => { + let page_id = &(max_value.key.clone(), max_value.value); + let page_index = self + .table_of_contents + .get(page_id) + .expect("page should be available in table of contents"); + let page = pages.get_mut(&page_index); + let page_to_update = if let Some(page) = page { + page + } else { + let page = parse_page::, INNER_PAGE_SIZE>( + &mut self.index_file, + page_index.into(), + ) + .await?; + pages.insert(page_index, page); + pages + .get_mut(&page_index) + .expect("should be available as was just inserted before") + }; + // println!("Event: {:?}", &ev); + let splitted_page = page_to_update.inner.split(*split_index); + let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { + id + } else { + self.next_page_id.fetch_add(1, Ordering::Relaxed).into() + }; + + self.table_of_contents.update_key( + page_id, + ( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ), + ); + self.table_of_contents.insert( + ( + splitted_page.node_id.key.clone(), + splitted_page.node_id.link, + ), + new_page_id, + ); + let header = GeneralHeader::new(new_page_id, PageType::Index, self.space_id); + let general_page = GeneralPage { + inner: splitted_page, + header, + }; + pages.insert(new_page_id, general_page); + } + } + } + + self.table_of_contents.persist(&mut self.index_file).await?; + persist_pages_batch(pages.values().cloned().collect(), &mut self.index_file).await?; + Ok(()) + } } #[cfg(test)] mod test { use data_bucket::{ - get_index_page_size_from_data_length, IndexPage, Persistable, INNER_PAGE_SIZE, + get_index_page_size_from_data_length, IndexPage, IndexValue, Persistable, INNER_PAGE_SIZE, }; #[test] fn test_size_measure() { let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE); - let page = IndexPage::new(0, size); + let page = IndexPage::new( + IndexValue { + key: 0, + link: Default::default(), + }, + size, + ); assert!(page.as_bytes().as_ref().len() <= INNER_PAGE_SIZE) } } diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index 6ec46f46..4cf8e7d3 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -1,11 +1,14 @@ +use std::collections::HashMap; use std::fmt::Debug; +use std::hash::Hash; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use data_bucket::page::PageId; use data_bucket::{ - parse_page, persist_page, GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, Link, - PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage, VariableSizeMeasurable, + parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPageUtility, + IndexValue, Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage, + VariableSizeMeasurable, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -20,6 +23,7 @@ use rkyv::util::AlignedVec; use rkyv::{rancor, Archive, Deserialize, Serialize}; use tokio::fs::File; +use crate::persistence::space::BatchChangeEvent; use crate::persistence::{IndexTableOfContents, SpaceIndex, SpaceIndexOps}; use crate::prelude::WT_INDEX_EXTENSION; use crate::UnsizedNode; @@ -27,7 +31,7 @@ use crate::UnsizedNode; #[derive(Debug)] pub struct SpaceIndexUnsized { space_id: SpaceId, - table_of_contents: IndexTableOfContents, + table_of_contents: IndexTableOfContents<(T, Link), DATA_LENGTH>, next_page_id: Arc, index_file: File, #[allow(dead_code)] @@ -39,6 +43,7 @@ where T: Archive + Ord + Eq + + Hash + Clone + Default + Debug @@ -48,7 +53,7 @@ where + Send + Sync + 'static, - ::Archived: Deserialize> + Ord + Eq, + ::Archived: Deserialize> + Ord + Eq + Debug, { pub async fn new>(index_file_path: S, space_id: SpaceId) -> eyre::Result { let space_index = SpaceIndex::::new(index_file_path, space_id).await?; @@ -66,11 +71,7 @@ where node_id: Pair, page_id: PageId, ) -> eyre::Result<()> { - let value = IndexValue { - key: node_id.key.clone(), - link: node_id.value, - }; - let page = UnsizedIndexPage::new(node_id.key.clone(), value)?; + let page = UnsizedIndexPage::new(node_id.clone().into())?; self.add_index_page(page, page_id).await } @@ -94,34 +95,38 @@ where } else { self.next_page_id.fetch_add(1, Ordering::Relaxed).into() }; - self.table_of_contents.insert(node_id.key.clone(), page_id); + self.table_of_contents + .insert((node_id.key.clone(), node_id.value), page_id); self.table_of_contents.persist(&mut self.index_file).await?; self.add_new_index_page(node_id, page_id).await?; Ok(()) } - async fn process_remove_node(&mut self, node_id: T) -> eyre::Result<()> { - self.table_of_contents.remove(&node_id); + async fn process_remove_node(&mut self, node_id: Pair) -> eyre::Result<()> { + self.table_of_contents.remove(&(node_id.key, node_id.value)); self.table_of_contents.persist(&mut self.index_file).await?; Ok(()) } async fn process_insert_at( &mut self, - node_id: T, + node_id: Pair, value: Pair, index: usize, ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; if let Some(new_node_id) = self .insert_on_index_page(page_id, node_id.clone(), index, value) .await? { - self.table_of_contents.update_key(&node_id, new_node_id); + self.table_of_contents.update_key( + &(node_id.key, node_id.value), + (new_node_id.key, new_node_id.value), + ); self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) @@ -130,10 +135,10 @@ where async fn insert_on_index_page( &mut self, page_id: PageId, - node_id: T, + node_id: Pair, index: usize, value: Pair, - ) -> eyre::Result> { + ) -> eyre::Result>> { let mut new_node_id = None; let mut utility = UnsizedIndexPage::::parse_index_page_utility( @@ -160,9 +165,9 @@ where (value_offset, (value_offset - previous_offset) as u16), ); - if node_id < value.key { - utility.update_node_id(value.key.clone())?; - new_node_id = Some(value.key); + if node_id.key < value.key { + utility.update_node_id(value.clone().into())?; + new_node_id = Some(value); } UnsizedIndexPage::::persist_index_page_utility( @@ -177,19 +182,22 @@ where async fn process_remove_at( &mut self, - node_id: T, + node_id: Pair, value: Pair, index: usize, ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; if let Some(new_node_id) = self .remove_from_index_page(page_id, node_id.clone(), index, value) .await? { - self.table_of_contents.update_key(&node_id, new_node_id); + self.table_of_contents.update_key( + &(node_id.key, node_id.value), + (new_node_id.key, new_node_id.value), + ); self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) @@ -198,10 +206,10 @@ where async fn remove_from_index_page( &mut self, page_id: PageId, - node_id: T, + node_id: Pair, index: usize, value: Pair, - ) -> eyre::Result> { + ) -> eyre::Result>> { let mut new_node_id = None; let mut utility = UnsizedIndexPage::::parse_index_page_utility( @@ -212,7 +220,7 @@ where utility.slots.remove(index); utility.slots_size -= 1; - if node_id == value.key { + if node_id.key == value.key { let (offset, len) = *utility .slots .get(index - 1) @@ -223,10 +231,9 @@ where offset, len, ) - .await? - .key; + .await?; utility.update_node_id(node_id)?; - new_node_id = Some(utility.node_id.clone()) + new_node_id = Some(utility.node_id.clone().into()) } UnsizedIndexPage::::persist_index_page_utility( @@ -239,10 +246,14 @@ where Ok(new_node_id) } - async fn process_split_node(&mut self, node_id: T, split_index: usize) -> eyre::Result<()> { + async fn process_split_node( + &mut self, + node_id: Pair, + split_index: usize, + ) -> eyre::Result<()> { let page_id = self .table_of_contents - .get(&node_id) + .get(&(node_id.key.clone(), node_id.value)) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; let mut page = parse_page::, DATA_LENGTH>( &mut self.index_file, @@ -256,10 +267,17 @@ where self.next_page_id.fetch_add(1, Ordering::Relaxed).into() }; - self.table_of_contents - .update_key(&node_id, page.inner.node_id.clone()); - self.table_of_contents - .insert(splitted_page.node_id.clone(), new_page_id); + self.table_of_contents.update_key( + &(node_id.key, node_id.value), + (page.inner.node_id.key.clone(), page.inner.node_id.link), + ); + self.table_of_contents.insert( + ( + splitted_page.node_id.key.clone(), + splitted_page.node_id.link, + ), + new_page_id, + ); self.table_of_contents.persist(&mut self.index_file).await?; self.add_index_page(splitted_page, new_page_id).await?; @@ -288,11 +306,12 @@ where } } -impl SpaceIndexOps for SpaceIndexUnsized +impl SpaceIndexOps for SpaceIndexUnsized where T: Archive + Ord + Eq + + Hash + Clone + Default + Debug @@ -302,7 +321,7 @@ where + Send + Sync + 'static, - ::Archived: Deserialize> + Ord + Eq, + ::Archived: Deserialize> + Ord + Eq + Debug, { async fn primary_from_table_files_path + Send>( table_path: S, @@ -328,7 +347,7 @@ where } async fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()> { - SpaceIndex::::bootstrap(file, table_name).await + SpaceIndex::::bootstrap(file, table_name).await } async fn process_change_event( @@ -340,22 +359,154 @@ where max_value: node_id, value, index, - } => self.process_insert_at(node_id.key, value, index).await, + } => self.process_insert_at(node_id, value, index).await, ChangeEvent::RemoveAt { max_value: node_id, value, index, - } => self.process_remove_at(node_id.key, value, index).await, + } => self.process_remove_at(node_id, value, index).await, ChangeEvent::CreateNode { max_value: node_id } => { self.process_create_node(node_id).await } ChangeEvent::RemoveNode { max_value: node_id } => { - self.process_remove_node(node_id.key).await + self.process_remove_node(node_id).await } ChangeEvent::SplitNode { max_value: node_id, split_index, - } => self.process_split_node(node_id.key, split_index).await, + } => self.process_split_node(node_id, split_index).await, } } + + async fn process_change_event_batch( + &mut self, + events: BatchChangeEvent, + ) -> eyre::Result<()> { + let mut pages: HashMap = HashMap::new(); + for ev in events { + match &ev { + ChangeEvent::InsertAt { max_value, .. } + | ChangeEvent::RemoveAt { max_value, .. } => { + let page_id = &(max_value.key.clone(), max_value.value); + let page_index = self + .table_of_contents + .get(page_id) + .expect("page should be available in table of contents"); + let page = pages.get_mut(&page_index); + let page_to_update = if let Some(page) = page { + page + } else { + // println!("Try to parse page: {:?} {:?}", page_index, page_id); + let page = + parse_page::, INNER_PAGE_SIZE>( + &mut self.index_file, + page_index.into(), + ) + .await?; + // println!("Page {:?} {:?} parsed", page_index, page_id); + pages.insert(page_index, page); + pages + .get_mut(&page_index) + .expect("should be available as was just inserted before") + }; + page_to_update.inner.apply_change_event(ev.clone())?; + if &( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ) != page_id + { + self.table_of_contents.update_key( + page_id, + ( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ), + ); + } + } + ChangeEvent::CreateNode { max_value } => { + let page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { + id + } else { + self.next_page_id.fetch_add(1, Ordering::Relaxed).into() + }; + self.table_of_contents + .insert((max_value.key.clone(), max_value.value), page_id); + + let page = + UnsizedIndexPage::::new(max_value.clone().into())?; + let header = GeneralHeader::new(page_id, PageType::IndexUnsized, self.space_id); + let general_page = GeneralPage { + inner: page, + header, + }; + pages.insert(page_id, general_page); + self.table_of_contents + .insert((max_value.key.clone(), max_value.value), page_id) + } + ChangeEvent::RemoveNode { max_value } => { + self.table_of_contents + .remove(&(max_value.key.clone(), max_value.value)); + } + ChangeEvent::SplitNode { + max_value, + split_index, + } => { + let page_id = &(max_value.key.clone(), max_value.value); + let page_index = self + .table_of_contents + .get(page_id) + .expect("page should be available in table of contents"); + let page = pages.get_mut(&page_index); + let page_to_update = if let Some(page) = page { + page + } else { + // println!("Try to parse page: {:?} {:?}", page_index, page_id); + let page = + parse_page::, INNER_PAGE_SIZE>( + &mut self.index_file, + page_index.into(), + ) + .await?; + pages.insert(page_index, page); + pages + .get_mut(&page_index) + .expect("should be available as was just inserted before") + }; + let splitted_page = page_to_update.inner.split(*split_index); + + let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { + id + } else { + self.next_page_id.fetch_add(1, Ordering::Relaxed).into() + }; + + self.table_of_contents.update_key( + page_id, + ( + page_to_update.inner.node_id.key.clone(), + page_to_update.inner.node_id.link, + ), + ); + self.table_of_contents.insert( + ( + splitted_page.node_id.key.clone(), + splitted_page.node_id.link, + ), + new_page_id, + ); + let header = GeneralHeader::new(new_page_id, PageType::Index, self.space_id); + let general_page = GeneralPage { + inner: splitted_page, + header, + }; + pages.insert(new_page_id, general_page); + } + } + } + + self.table_of_contents.persist(&mut self.index_file).await?; + persist_pages_batch(pages.values().cloned().collect(), &mut self.index_file).await?; + Ok(()) + } } diff --git a/src/persistence/space/index/util.rs b/src/persistence/space/index/util.rs index d26cb6f2..cb4c39ce 100644 --- a/src/persistence/space/index/util.rs +++ b/src/persistence/space/index/util.rs @@ -1,15 +1,16 @@ use crate::prelude::IndexTableOfContents; use data_bucket::{ - GeneralHeader, GeneralPage, IndexPage, PageType, SizeMeasurable, UnsizedIndexPage, + GeneralHeader, GeneralPage, IndexPage, Link, PageType, SizeMeasurable, UnsizedIndexPage, VariableSizeMeasurable, }; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +#[allow(clippy::type_complexity)] pub fn map_index_pages_to_toc_and_general( pages: Vec>, ) -> ( - IndexTableOfContents, + IndexTableOfContents<(T, Link), DATA_LENGTH>, Vec>>, ) where @@ -20,7 +21,10 @@ where let mut toc = IndexTableOfContents::new(0.into(), next_page_id.clone()); for page in pages { let page_id = next_page_id.fetch_add(1, Ordering::Relaxed); - toc.insert(page.node_id.clone(), page_id.into()); + toc.insert( + (page.node_id.key.clone(), page.node_id.link), + page_id.into(), + ); let header = GeneralHeader::new(page_id.into(), PageType::Index, 0.into()); let index_page = GeneralPage { inner: page, @@ -32,10 +36,11 @@ where (toc, general_index_pages) } +#[allow(clippy::type_complexity)] pub fn map_unsized_index_pages_to_toc_and_general( pages: Vec>, ) -> ( - IndexTableOfContents, + IndexTableOfContents<(T, Link), DATA_LENGTH>, Vec>>, ) where @@ -46,7 +51,10 @@ where let mut toc = IndexTableOfContents::new(0.into(), next_page_id.clone()); for page in pages { let page_id = next_page_id.fetch_add(1, Ordering::Relaxed); - toc.insert(page.node_id.clone(), page_id.into()); + toc.insert( + (page.node_id.key.clone(), page.node_id.link), + page_id.into(), + ); let header = GeneralHeader::new(page_id.into(), PageType::IndexUnsized, 0.into()); let index_page = GeneralPage { inner: page, diff --git a/src/persistence/space/mod.rs b/src/persistence/space/mod.rs index efbc4082..cc7a3174 100644 --- a/src/persistence/space/mod.rs +++ b/src/persistence/space/mod.rs @@ -1,9 +1,11 @@ mod data; mod index; +use std::collections::HashMap; use std::future::Future; use std::path::Path; +use data_bucket::page::PageId; use data_bucket::{GeneralPage, Link, SpaceInfoPage}; use indexset::cdc::change::ChangeEvent; use indexset::core::pair::Pair; @@ -15,6 +17,10 @@ pub use index::{ IndexTableOfContents, SpaceIndex, SpaceIndexUnsized, }; +pub type BatchData = HashMap)>>; + +pub type BatchChangeEvent = Vec>>; + pub trait SpaceDataOps { fn from_table_files_path + Send>( path: S, @@ -30,6 +36,10 @@ pub trait SpaceDataOps { link: Link, bytes: &[u8], ) -> impl Future> + Send; + fn save_batch_data( + &mut self, + batch_data: BatchData, + ) -> impl Future> + Send; fn get_mut_info(&mut self) -> &mut GeneralPage>; fn save_info(&mut self) -> impl Future> + Send; } @@ -57,6 +67,10 @@ where &mut self, event: ChangeEvent>, ) -> impl Future> + Send; + fn process_change_event_batch( + &mut self, + events: BatchChangeEvent, + ) -> impl Future> + Send; } pub trait SpaceSecondaryIndexOps { @@ -69,6 +83,10 @@ pub trait SpaceSecondaryIndexOps { &mut self, events: SecondaryIndexEvents, ) -> impl Future> + Send; + fn process_change_event_batch( + &mut self, + events: SecondaryIndexEvents, + ) -> impl Future> + Send; } pub async fn open_or_create_file>(path: S) -> eyre::Result { diff --git a/src/persistence/task.rs b/src/persistence/task.rs index 546187e8..b8c7037d 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -1,17 +1,199 @@ +use crate::persistence::operation::{ + BatchInnerRow, BatchInnerWorkTable, BatchOperation, OperationId, PosByOpIdQuery, +}; +use crate::persistence::PersistenceEngineOps; +use crate::prelude::*; +use crate::util::OptimizedVec; + +use std::collections::HashSet; use std::fmt::Debug; -use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; +use data_bucket::page::PageId; use tokio::sync::Notify; +use worktable_codegen::worktable; -use crate::persistence::PersistenceEngineOps; -use crate::prelude::Operation; +worktable! ( + name: QueueInner, + columns: { + id: u64 primary_key autoincrement, + operation_id: OperationId, + page_id: PageId, + link: Link, + pos: usize, + }, + indexes: { + operation_id_idx: operation_id, + page_id_idx: page_id, + link_idx: link, + }, +); + +pub struct QueueAnalyzer { + operations: OptimizedVec>, + queue_inner_wt: QueueInnerWorkTable, +} + +impl + QueueAnalyzer +where + PrimaryKeyGenState: Debug, + PrimaryKey: Debug, + SecondaryKeys: Debug, +{ + pub fn new() -> Self { + Self { + operations: OptimizedVec::with_capacity(256), + queue_inner_wt: QueueInnerWorkTable::default(), + } + } + + pub fn push( + &mut self, + value: Operation, + ) -> eyre::Result<()> { + let link = value.link(); + let mut row = QueueInnerRow { + id: self.queue_inner_wt.get_next_pk().into(), + operation_id: value.operation_id(), + page_id: link.page_id, + link, + pos: 0, + }; + let pos = self.operations.push(value); + row.pos = pos; + self.queue_inner_wt.insert(row)?; + Ok(()) + } + + pub fn extend_from_iter( + &mut self, + i: impl Iterator>, + ) -> eyre::Result<()> { + for op in i { + self.push(op)? + } + Ok(()) + } + + pub fn get_first_op_id_available(&self) -> Option { + self.queue_inner_wt + .0 + .indexes + .operation_id_idx + .iter() + .next() + .map(|(id, _)| *id) + } + + pub async fn collect_batch_from_op_id( + &mut self, + op_id: OperationId, + ) -> eyre::Result> + where + PrimaryKeyGenState: Clone, + PrimaryKey: Clone, + SecondaryKeys: Clone, + { + let ops_rows = self + .queue_inner_wt + .select_by_operation_id(op_id) + .execute()?; + + let mut ops_set = HashSet::new(); + + let used_page_ids = ops_rows.iter().map(|r| r.page_id).collect::>(); + // We collect all ops available for pages that are used in our current op_id + for page_id in used_page_ids.iter() { + let page_ops = self.queue_inner_wt.select_by_page_id(*page_id).execute()?; + ops_set.extend(page_ops.into_iter().map(|r| r.operation_id)); + } + // After we need to find out if multi ops are using same pages, and if not, + // we need to find the first multi op that blocks batch update by using + // another page. + let mut block_op_id = None; + for op_id in ops_set.iter().filter(|op_id| match op_id { + OperationId::Single(_) => false, + OperationId::Multi(_) => true, + }) { + let rows = self + .queue_inner_wt + .select_by_operation_id(*op_id) + .execute()?; + let pages = rows.iter().map(|r| r.page_id).collect::>(); + // if pages used by multi op are not available is used_page_ids set, it's blocker op. + for page in pages.iter() { + if !used_page_ids.contains(page) { + if let Some(block_op_id) = block_op_id.as_mut() { + if *block_op_id > *op_id { + *block_op_id = *op_id + } + } else { + block_op_id = Some(*op_id) + } + } + } + } + // And if we found some blocker, we need to remove all ops after blocking op. + let ops_set = if let Some(block_op_id) = block_op_id { + ops_set + .into_iter() + .filter(|op_id| *op_id >= block_op_id) + .collect() + } else { + ops_set + }; + // After this point, we have ops set ready for batch generation. + let mut ops_pos_set = HashSet::new(); + for op_id in ops_set { + let rows = self + .queue_inner_wt + .select_by_operation_id(op_id) + .execute()?; + ops_pos_set.extend(rows.into_iter().map(|r| (r.pos, r.id))) + } + + let mut ops = Vec::with_capacity(ops_pos_set.len()); + let info_wt = BatchInnerWorkTable::default(); + for (pos, id) in ops_pos_set { + let mut row: BatchInnerRow = self + .queue_inner_wt + .select(id.into()) + .expect("exists as Id exists") + .into(); + let op = self + .operations + .remove(pos) + .expect("should be available as presented in table"); + row.pos = ops.len(); + row.op_type = op.operation_type(); + ops.push(op); + info_wt.insert(row)?; + self.queue_inner_wt.delete_without_lock(id.into())? + } + // println!("New wt generated {:?}", start.elapsed()); + // return ops sorted by `OperationId` + ops.sort_by_key(|k| k.operation_id()); + for (pos, op) in ops.iter().enumerate() { + let op_id = op.operation_id(); + let q = PosByOpIdQuery { pos }; + info_wt.update_pos_by_op_id(q, op_id).await?; + } + + Ok(BatchOperation { ops, info_wt }) + } + + pub fn len(&self) -> usize { + self.queue_inner_wt.count() + } +} #[derive(Debug)] pub struct Queue { queue: lockfree::queue::Queue>, notify: Notify, - len: AtomicU16, + len: Arc, } impl @@ -21,7 +203,7 @@ impl Self { queue: lockfree::queue::Queue::new(), notify: Notify::new(), - len: AtomicU16::new(0), + len: Arc::new(AtomicU16::new(0)), } } @@ -55,6 +237,15 @@ impl } } + pub fn pop_iter( + &self, + ) -> impl Iterator> { + let iter_count = self.len.clone(); + self.queue.pop_iter().inspect(move |_| { + iter_count.fetch_sub(1, Ordering::Relaxed); + }) + } + pub fn len(&self) -> usize { self.len.load(Ordering::Relaxed) as usize } @@ -66,8 +257,6 @@ pub struct PersistenceTask { engine_task_handle: tokio::task::AbortHandle, queue: Arc>, progress_notify: Arc, - // True if non-empty, false either. - wait_state: Arc, } impl @@ -80,32 +269,57 @@ impl pub fn run_engine(mut engine: E) -> Self where E: PersistenceEngineOps + Send + 'static, - SecondaryKeys: Debug + Send + 'static, - PrimaryKeyGenState: Debug + Send + 'static, - PrimaryKey: Debug + Send + 'static, + SecondaryKeys: Clone + Debug + Send + Sync + 'static, + PrimaryKeyGenState: Clone + Debug + Send + Sync + 'static, + PrimaryKey: Clone + Debug + Send + Sync + 'static, { let queue = Arc::new(Queue::new()); let progress_notify = Arc::new(Notify::new()); - let wait_state = Arc::new(AtomicBool::new(false)); let engine_queue = queue.clone(); let engine_progress_notify = progress_notify.clone(); - let engine_wait_state = wait_state.clone(); let task = async move { + let mut analyzer = QueueAnalyzer::new(); loop { - let next_op = if let Some(next_op) = engine_queue.immediate_pop() { - next_op + let op = if let Some(next_op) = engine_queue.immediate_pop() { + Some(next_op) } else { - engine_wait_state.store(true, Ordering::Relaxed); - engine_progress_notify.notify_waiters(); - let res = engine_queue.pop().await; - engine_wait_state.store(false, Ordering::Relaxed); - res + // println!("Queue is {:?}", analyzer.len()); + if analyzer.len() == 0 { + engine_progress_notify.notify_waiters(); + Some(engine_queue.pop().await) + } else { + None + } }; - tracing::debug!("Applying operation {:?}", next_op); - let res = engine.apply_operation(next_op).await; - if let Err(err) = res { - tracing::warn!("{}", err); + if let Some(op) = op { + if let Err(err) = analyzer.push(op) { + tracing::warn!("Error while feeding data to analyzer: {}", err); + } + } + let ops_available_iter = engine_queue.pop_iter(); + if let Err(err) = analyzer.extend_from_iter(ops_available_iter) { + tracing::warn!("Error while feeding data to analyzer: {}", err); + } + if let Some(op_id) = analyzer.get_first_op_id_available() { + let batch_op = analyzer.collect_batch_from_op_id(op_id).await; + if let Err(e) = batch_op { + tracing::warn!("Error collecting batch operation: {}", e); + } else { + let batch_op = batch_op.unwrap(); + // println!( + // "Batch len is {}, queue len is {}", + // batch_op.ops.len(), + // analyzer.len() + // ); + let res = engine.apply_batch_operation(batch_op).await; + if let Err(e) = res { + tracing::warn!( + "Persistence engine failed while applying batch op: {}", + e + ); + } + } } } }; @@ -114,14 +328,13 @@ impl queue, engine_task_handle, progress_notify, - wait_state, } } pub async fn wait_for_ops(&self) { - if !self.wait_state.load(Ordering::Relaxed) { - let count = self.queue.len(); - println!("Waiting for {} operations", count); + let count = self.queue.len(); + if count != 0 { + tracing::info!("Waiting for {} operations", count); self.progress_notify.notified().await } } diff --git a/src/table/mod.rs b/src/table/mod.rs index 1c6c0a10..3a614d54 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -4,6 +4,14 @@ pub mod system_info; use std::fmt::Debug; use std::marker::PhantomData; +use crate::in_memory::{DataPages, RowWrapper, StorableRow}; +use crate::lock::LockMap; +use crate::persistence::{InsertOperation, Operation}; +use crate::prelude::{OperationId, PrimaryKeyGeneratorState}; +use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; +use crate::{ + in_memory, IndexError, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, +}; use data_bucket::{Link, INNER_PAGE_SIZE}; use derive_more::{Display, Error, From}; use indexset::core::node::NodeLike; @@ -17,15 +25,7 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; - -use crate::in_memory::{DataPages, RowWrapper, StorableRow}; -use crate::lock::LockMap; -use crate::persistence::{InsertOperation, Operation}; -use crate::prelude::PrimaryKeyGeneratorState; -use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; -use crate::{ - in_memory, IndexError, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, -}; +use uuid::Uuid; #[derive(Debug)] pub struct WorkTable< @@ -267,7 +267,7 @@ where } let op = Operation::Insert(InsertOperation { - id: Default::default(), + id: OperationId::Single(Uuid::now_v7()), pk_gen_state: self.pk_gen.get_state(), primary_key_events, secondary_keys_events: indexes_res.expect("was checked before"), diff --git a/src/util/mod.rs b/src/util/mod.rs index a2cbe109..7e68e97a 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,5 @@ +mod optimized_vec; mod ordered_float; +pub use optimized_vec::OptimizedVec; pub use ordered_float::{OrderedF32Def, OrderedF64Def}; diff --git a/src/util/optimized_vec.rs b/src/util/optimized_vec.rs new file mode 100644 index 00000000..e5990394 --- /dev/null +++ b/src/util/optimized_vec.rs @@ -0,0 +1,230 @@ +/// Struct for storing data in a vector. +/// It has a vector of data and vector of empty indexes. +/// If the `empty` vector is empty, then the data vector is extended. +/// If the `empty` vector is not empty, then index from the empty vector is used to insert the data. +#[derive(Debug)] +pub struct OptimizedVec { + /// Vector of data. + data: Vec, + /// Vector of empty indexes. + empty: Vec, + /// Flag to check if the item is empty. + emptiness: Vec, + /// Number of elements in the vector. + length: usize, +} + +impl Default for OptimizedVec { + fn default() -> Self { + OptimizedVec { + data: Vec::new(), + empty: Vec::new(), + emptiness: Vec::new(), + length: 0, + } + } +} + +impl OptimizedVec { + pub fn with_capacity(cap: usize) -> Self { + OptimizedVec { + data: Vec::with_capacity(cap), + empty: Vec::with_capacity(cap), + emptiness: Vec::with_capacity(cap), + length: 0, + } + } + + /// Pushes a value to the vector. + /// # Arguments + /// * `value` - Value to push + /// # Returns + /// * `usize` - Index of the pushed value + pub fn push(&mut self, value: T) -> usize { + let index = if self.empty.is_empty() { + self.data.push(value); + self.emptiness.push(false); + self.length + } else { + let index = self.empty.pop().unwrap(); + self.data[index] = value; + self.emptiness[index] = false; + index + }; + + self.length += 1; + + index + } + + /// Gets a value from the vector. + /// # Arguments + /// * `index` - Index of the value to get + /// # Returns + /// * `Option` - Value at the index, + /// or `None` if the index is out of bounds or the value is empty. + #[allow(dead_code)] + pub fn get(&self, index: usize) -> Option<&T> { + if index >= self.data.len() { + return None; + } + + if self.emptiness[index] { + return None; + } + + Some(&self.data[index]) + } + + /// Gets a mutable value from the vector. + /// # Arguments + /// * `index` - Index of the value to get + /// # Returns + /// * `Option<&mut T>` - Mutable value at the index, + /// or `None` if the index is out of bounds or the value is empty. + #[allow(dead_code)] + pub fn get_mut(&mut self, index: usize) -> Option<&mut T> { + if index >= self.data.len() { + return None; + } + + if self.emptiness[index] { + return None; + } + + Some(&mut self.data[index]) + } + + /// Removes a value from the vector. + /// # Arguments + /// * `index` - Index of the value to remove. + /// # Returns + /// * `Option` - Value at the index, + /// or `None` if the index is out of bounds or the value is empty. + pub fn remove(&mut self, index: usize) -> Option + where + T: Clone, + { + if index >= self.data.len() { + return None; + } + + if self.emptiness[index] { + return None; + } + + self.emptiness[index] = true; + self.empty.push(index); + self.length -= 1; + + Some(self.data[index].clone()) + } + + /// Gets the data vector. + /// # Returns + /// * `&Vec` - Data vector. + #[allow(dead_code)] + pub fn get_data(&self) -> &Vec { + &self.data + } + + /// Gets the length of the vector. + /// # Returns + /// * `usize` - Length of the vector. + #[allow(dead_code)] + #[must_use] + pub fn len(&self) -> usize { + self.length + } + + /// Returns true of [`OptimizedVec`] is empty. + /// # Returns + /// * `bool` - State of emptiness. + #[allow(dead_code)] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::OptimizedVec; + + #[test] + fn test_optimized_vec_new() { + let vec = OptimizedVec::::default(); + + assert_eq!(vec.data.len(), 0); + assert_eq!(vec.empty.len(), 0); + assert_eq!(vec.emptiness.len(), 0); + + assert_eq!(vec.length, 0); + } + + #[test] + fn test_optimized_vec_push() { + let mut vec = OptimizedVec::::default(); + let index = vec.push(1); + + assert_eq!(index, 0); + assert_eq!(vec.data.len(), 1); + assert_eq!(vec.emptiness.len(), 1); + assert_eq!(vec.empty.len(), 0); + assert_eq!(vec.length, 1); + } + + #[test] + fn test_optimized_vec_get() { + let mut vec = OptimizedVec::::default(); + let index = vec.push(1); + + assert_eq!(vec.get(index), Some(&1)); + assert_eq!(vec.get(index + 1), None); + } + + #[test] + fn test_optimized_vec_get_mut() { + let mut vec = OptimizedVec::::default(); + let index = vec.push(1); + + assert_eq!(vec.get_mut(index), Some(&mut 1)); + assert_eq!(vec.get_mut(index + 1), None); + } + + #[test] + fn test_optimized_vec_remove() { + let mut vec = OptimizedVec::::default(); + let index = vec.push(1); + + assert_eq!(vec.remove(index), Some(1)); + assert_eq!(vec.remove(index + 1), None); + assert_eq!(vec.data.len(), 1); + assert_eq!(vec.emptiness.len(), 1); + assert_eq!(vec.empty.len(), 1); + assert_eq!(vec.empty[0], index); + assert_eq!(vec.length, 0); + } + + #[test] + fn test_optimized_vec_push_remove() { + let mut vec = OptimizedVec::::default(); + let index = vec.push(1); + + assert_eq!(index, 0); + assert_eq!(vec.data.len(), 1); + assert_eq!(vec.emptiness.len(), 1); + assert_eq!(vec.empty.len(), 0); + assert_eq!(vec.length, 1); + + assert_eq!(vec.remove(index), Some(1)); + + let index = vec.push(2); + + assert_eq!(index, 0); + assert_eq!(vec.data.len(), 1); + assert_eq!(vec.emptiness.len(), 1); + assert_eq!(vec.empty.len(), 0); + assert_eq!(vec.length, 1); + } +} diff --git a/tests/data/expected/space_index/indexset/process_create_node.wt.idx b/tests/data/expected/space_index/indexset/process_create_node.wt.idx index bb61d11a..48c95f56 100644 Binary files a/tests/data/expected/space_index/indexset/process_create_node.wt.idx and b/tests/data/expected/space_index/indexset/process_create_node.wt.idx differ diff --git a/tests/data/expected/space_index/indexset/process_insert_at.wt.idx b/tests/data/expected/space_index/indexset/process_insert_at.wt.idx index 6470780a..9704e608 100644 Binary files a/tests/data/expected/space_index/indexset/process_insert_at.wt.idx and b/tests/data/expected/space_index/indexset/process_insert_at.wt.idx differ diff --git a/tests/data/expected/space_index/indexset/process_insert_at_big_amount.wt.idx b/tests/data/expected/space_index/indexset/process_insert_at_big_amount.wt.idx index 238b5c12..4883ada0 100644 Binary files a/tests/data/expected/space_index/indexset/process_insert_at_big_amount.wt.idx and b/tests/data/expected/space_index/indexset/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/expected/space_index/process_create_node.wt.idx b/tests/data/expected/space_index/process_create_node.wt.idx index 41f03789..d0efe214 100644 Binary files a/tests/data/expected/space_index/process_create_node.wt.idx and b/tests/data/expected/space_index/process_create_node.wt.idx differ diff --git a/tests/data/expected/space_index/process_create_node_after_remove.wt.idx b/tests/data/expected/space_index/process_create_node_after_remove.wt.idx index 32c980a2..922afb40 100644 Binary files a/tests/data/expected/space_index/process_create_node_after_remove.wt.idx and b/tests/data/expected/space_index/process_create_node_after_remove.wt.idx differ diff --git a/tests/data/expected/space_index/process_create_second_node.wt.idx b/tests/data/expected/space_index/process_create_second_node.wt.idx index 988ae640..3d82212c 100644 Binary files a/tests/data/expected/space_index/process_create_second_node.wt.idx and b/tests/data/expected/space_index/process_create_second_node.wt.idx differ diff --git a/tests/data/expected/space_index/process_insert_at.wt.idx b/tests/data/expected/space_index/process_insert_at.wt.idx index 6470780a..9704e608 100644 Binary files a/tests/data/expected/space_index/process_insert_at.wt.idx and b/tests/data/expected/space_index/process_insert_at.wt.idx differ diff --git a/tests/data/expected/space_index/process_insert_at_big_amount.wt.idx b/tests/data/expected/space_index/process_insert_at_big_amount.wt.idx index 238b5c12..6809a3d7 100644 Binary files a/tests/data/expected/space_index/process_insert_at_big_amount.wt.idx and b/tests/data/expected/space_index/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/expected/space_index/process_insert_at_removed_place.wt.idx b/tests/data/expected/space_index/process_insert_at_removed_place.wt.idx index 39697ad7..e3a4dc9a 100644 Binary files a/tests/data/expected/space_index/process_insert_at_removed_place.wt.idx and b/tests/data/expected/space_index/process_insert_at_removed_place.wt.idx differ diff --git a/tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx b/tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx index 72549a79..3aa6c5a5 100644 Binary files a/tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx and b/tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx differ diff --git a/tests/data/expected/space_index/process_remove_at_node_id.wt.idx b/tests/data/expected/space_index/process_remove_at_node_id.wt.idx index 7fd9fb79..c26de708 100644 Binary files a/tests/data/expected/space_index/process_remove_at_node_id.wt.idx and b/tests/data/expected/space_index/process_remove_at_node_id.wt.idx differ diff --git a/tests/data/expected/space_index/process_remove_node.wt.idx b/tests/data/expected/space_index/process_remove_node.wt.idx index 40520dc1..d83cb09c 100644 Binary files a/tests/data/expected/space_index/process_remove_node.wt.idx and b/tests/data/expected/space_index/process_remove_node.wt.idx differ diff --git a/tests/data/expected/space_index/process_split_node.wt.idx b/tests/data/expected/space_index/process_split_node.wt.idx index d0c200ed..4b110e69 100644 Binary files a/tests/data/expected/space_index/process_split_node.wt.idx and b/tests/data/expected/space_index/process_split_node.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx b/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx index 47558ae4..a5884313 100644 Binary files a/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx and b/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/indexset/process_insert_at.wt.idx b/tests/data/expected/space_index_unsized/indexset/process_insert_at.wt.idx index 1404c58e..294009d9 100644 Binary files a/tests/data/expected/space_index_unsized/indexset/process_insert_at.wt.idx and b/tests/data/expected/space_index_unsized/indexset/process_insert_at.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx b/tests/data/expected/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx index 8c024604..7237eb43 100644 Binary files a/tests/data/expected/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx and b/tests/data/expected/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_create_node.wt.idx b/tests/data/expected/space_index_unsized/process_create_node.wt.idx index cc5691f2..1949c975 100644 Binary files a/tests/data/expected/space_index_unsized/process_create_node.wt.idx and b/tests/data/expected/space_index_unsized/process_create_node.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_create_node_after_remove.wt.idx b/tests/data/expected/space_index_unsized/process_create_node_after_remove.wt.idx index 69522975..68014d28 100644 Binary files a/tests/data/expected/space_index_unsized/process_create_node_after_remove.wt.idx and b/tests/data/expected/space_index_unsized/process_create_node_after_remove.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_create_second_node.wt.idx b/tests/data/expected/space_index_unsized/process_create_second_node.wt.idx index 3217c314..bc6417e4 100644 Binary files a/tests/data/expected/space_index_unsized/process_create_second_node.wt.idx and b/tests/data/expected/space_index_unsized/process_create_second_node.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_insert_at.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at.wt.idx index 36703bdc..4cf8670b 100644 Binary files a/tests/data/expected/space_index_unsized/process_insert_at.wt.idx and b/tests/data/expected/space_index_unsized/process_insert_at.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx index 47a3f954..d6777ee4 100644 Binary files a/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx and b/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx index 741c8f1a..2d4f0d36 100644 Binary files a/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx and b/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_insert_at_with_node_id_update.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at_with_node_id_update.wt.idx index c6dc1305..1d29a153 100644 Binary files a/tests/data/expected/space_index_unsized/process_insert_at_with_node_id_update.wt.idx and b/tests/data/expected/space_index_unsized/process_insert_at_with_node_id_update.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_remove_at.wt.idx b/tests/data/expected/space_index_unsized/process_remove_at.wt.idx index 1203c1fd..87efb851 100644 Binary files a/tests/data/expected/space_index_unsized/process_remove_at.wt.idx and b/tests/data/expected/space_index_unsized/process_remove_at.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx b/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx index f39521db..bb12ef29 100644 Binary files a/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx and b/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_remove_node.wt.idx b/tests/data/expected/space_index_unsized/process_remove_node.wt.idx index 5e5e322a..c05f507f 100644 Binary files a/tests/data/expected/space_index_unsized/process_remove_node.wt.idx and b/tests/data/expected/space_index_unsized/process_remove_node.wt.idx differ diff --git a/tests/data/expected/space_index_unsized/process_split_node.wt.idx b/tests/data/expected/space_index_unsized/process_split_node.wt.idx index 73dfc5c8..d8c0b3c2 100644 Binary files a/tests/data/expected/space_index_unsized/process_split_node.wt.idx and b/tests/data/expected/space_index_unsized/process_split_node.wt.idx differ diff --git a/tests/data/expected/test_persist/another_idx.wt.idx b/tests/data/expected/test_persist/another_idx.wt.idx index 39de0d8d..f311be42 100644 Binary files a/tests/data/expected/test_persist/another_idx.wt.idx and b/tests/data/expected/test_persist/another_idx.wt.idx differ diff --git a/tests/data/expected/test_persist/primary.wt.idx b/tests/data/expected/test_persist/primary.wt.idx index 3e773971..8165c034 100644 Binary files a/tests/data/expected/test_persist/primary.wt.idx and b/tests/data/expected/test_persist/primary.wt.idx differ diff --git a/tests/data/expected/test_without_secondary_indexes/primary.wt.idx b/tests/data/expected/test_without_secondary_indexes/primary.wt.idx index d1ebdfb8..996835bd 100644 Binary files a/tests/data/expected/test_without_secondary_indexes/primary.wt.idx and b/tests/data/expected/test_without_secondary_indexes/primary.wt.idx differ diff --git a/tests/data/space_index/indexset/process_create_node.wt.idx b/tests/data/space_index/indexset/process_create_node.wt.idx index bb61d11a..48c95f56 100644 Binary files a/tests/data/space_index/indexset/process_create_node.wt.idx and b/tests/data/space_index/indexset/process_create_node.wt.idx differ diff --git a/tests/data/space_index/indexset/process_insert_at.wt.idx b/tests/data/space_index/indexset/process_insert_at.wt.idx index 6470780a..9704e608 100644 Binary files a/tests/data/space_index/indexset/process_insert_at.wt.idx and b/tests/data/space_index/indexset/process_insert_at.wt.idx differ diff --git a/tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx b/tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx index 238b5c12..4883ada0 100644 Binary files a/tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx and b/tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/space_index/process_create_node.wt.idx b/tests/data/space_index/process_create_node.wt.idx index 41f03789..d0efe214 100644 Binary files a/tests/data/space_index/process_create_node.wt.idx and b/tests/data/space_index/process_create_node.wt.idx differ diff --git a/tests/data/space_index/process_create_node_after_remove.wt.idx b/tests/data/space_index/process_create_node_after_remove.wt.idx index 32c980a2..922afb40 100644 Binary files a/tests/data/space_index/process_create_node_after_remove.wt.idx and b/tests/data/space_index/process_create_node_after_remove.wt.idx differ diff --git a/tests/data/space_index/process_create_second_node.wt.idx b/tests/data/space_index/process_create_second_node.wt.idx index 988ae640..3d82212c 100644 Binary files a/tests/data/space_index/process_create_second_node.wt.idx and b/tests/data/space_index/process_create_second_node.wt.idx differ diff --git a/tests/data/space_index/process_insert_at.wt.idx b/tests/data/space_index/process_insert_at.wt.idx index 6470780a..9704e608 100644 Binary files a/tests/data/space_index/process_insert_at.wt.idx and b/tests/data/space_index/process_insert_at.wt.idx differ diff --git a/tests/data/space_index/process_insert_at_big_amount.wt.idx b/tests/data/space_index/process_insert_at_big_amount.wt.idx index 238b5c12..6809a3d7 100644 Binary files a/tests/data/space_index/process_insert_at_big_amount.wt.idx and b/tests/data/space_index/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/space_index/process_insert_at_removed_place.wt.idx b/tests/data/space_index/process_insert_at_removed_place.wt.idx index 39697ad7..e3a4dc9a 100644 Binary files a/tests/data/space_index/process_insert_at_removed_place.wt.idx and b/tests/data/space_index/process_insert_at_removed_place.wt.idx differ diff --git a/tests/data/space_index/process_insert_at_with_node_id_update.wt.idx b/tests/data/space_index/process_insert_at_with_node_id_update.wt.idx index 72549a79..3aa6c5a5 100644 Binary files a/tests/data/space_index/process_insert_at_with_node_id_update.wt.idx and b/tests/data/space_index/process_insert_at_with_node_id_update.wt.idx differ diff --git a/tests/data/space_index/process_remove_at.wt.idx b/tests/data/space_index/process_remove_at.wt.idx index 41f03789..d0efe214 100644 Binary files a/tests/data/space_index/process_remove_at.wt.idx and b/tests/data/space_index/process_remove_at.wt.idx differ diff --git a/tests/data/space_index/process_remove_at_node_id.wt.idx b/tests/data/space_index/process_remove_at_node_id.wt.idx index 7fd9fb79..c26de708 100644 Binary files a/tests/data/space_index/process_remove_at_node_id.wt.idx and b/tests/data/space_index/process_remove_at_node_id.wt.idx differ diff --git a/tests/data/space_index/process_remove_node.wt.idx b/tests/data/space_index/process_remove_node.wt.idx index 40520dc1..d83cb09c 100644 Binary files a/tests/data/space_index/process_remove_node.wt.idx and b/tests/data/space_index/process_remove_node.wt.idx differ diff --git a/tests/data/space_index/process_split_node.wt.idx b/tests/data/space_index/process_split_node.wt.idx index d0c200ed..4b110e69 100644 Binary files a/tests/data/space_index/process_split_node.wt.idx and b/tests/data/space_index/process_split_node.wt.idx differ diff --git a/tests/data/space_index_unsized/indexset/process_create_node.wt.idx b/tests/data/space_index_unsized/indexset/process_create_node.wt.idx index 47558ae4..a5884313 100644 Binary files a/tests/data/space_index_unsized/indexset/process_create_node.wt.idx and b/tests/data/space_index_unsized/indexset/process_create_node.wt.idx differ diff --git a/tests/data/space_index_unsized/indexset/process_insert_at.wt.idx b/tests/data/space_index_unsized/indexset/process_insert_at.wt.idx index 1404c58e..294009d9 100644 Binary files a/tests/data/space_index_unsized/indexset/process_insert_at.wt.idx and b/tests/data/space_index_unsized/indexset/process_insert_at.wt.idx differ diff --git a/tests/data/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx b/tests/data/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx index 8c024604..7237eb43 100644 Binary files a/tests/data/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx and b/tests/data/space_index_unsized/indexset/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/space_index_unsized/process_create_node.wt.idx b/tests/data/space_index_unsized/process_create_node.wt.idx index cc5691f2..1949c975 100644 Binary files a/tests/data/space_index_unsized/process_create_node.wt.idx and b/tests/data/space_index_unsized/process_create_node.wt.idx differ diff --git a/tests/data/space_index_unsized/process_create_node_after_remove.wt.idx b/tests/data/space_index_unsized/process_create_node_after_remove.wt.idx index 69522975..68014d28 100644 Binary files a/tests/data/space_index_unsized/process_create_node_after_remove.wt.idx and b/tests/data/space_index_unsized/process_create_node_after_remove.wt.idx differ diff --git a/tests/data/space_index_unsized/process_create_second_node.wt.idx b/tests/data/space_index_unsized/process_create_second_node.wt.idx index 3217c314..bc6417e4 100644 Binary files a/tests/data/space_index_unsized/process_create_second_node.wt.idx and b/tests/data/space_index_unsized/process_create_second_node.wt.idx differ diff --git a/tests/data/space_index_unsized/process_insert_at.wt.idx b/tests/data/space_index_unsized/process_insert_at.wt.idx index 36703bdc..4cf8670b 100644 Binary files a/tests/data/space_index_unsized/process_insert_at.wt.idx and b/tests/data/space_index_unsized/process_insert_at.wt.idx differ diff --git a/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx b/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx index 47a3f954..d6777ee4 100644 Binary files a/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx and b/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx differ diff --git a/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx b/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx index 741c8f1a..2d4f0d36 100644 Binary files a/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx and b/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx differ diff --git a/tests/data/space_index_unsized/process_insert_at_with_node_id_update.wt.idx b/tests/data/space_index_unsized/process_insert_at_with_node_id_update.wt.idx index c6dc1305..1d29a153 100644 Binary files a/tests/data/space_index_unsized/process_insert_at_with_node_id_update.wt.idx and b/tests/data/space_index_unsized/process_insert_at_with_node_id_update.wt.idx differ diff --git a/tests/data/space_index_unsized/process_remove_at.wt.idx b/tests/data/space_index_unsized/process_remove_at.wt.idx index 1203c1fd..87efb851 100644 Binary files a/tests/data/space_index_unsized/process_remove_at.wt.idx and b/tests/data/space_index_unsized/process_remove_at.wt.idx differ diff --git a/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx b/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx index f39521db..bb12ef29 100644 Binary files a/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx and b/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx differ diff --git a/tests/data/space_index_unsized/process_remove_node.wt.idx b/tests/data/space_index_unsized/process_remove_node.wt.idx index 5e5e322a..c05f507f 100644 Binary files a/tests/data/space_index_unsized/process_remove_node.wt.idx and b/tests/data/space_index_unsized/process_remove_node.wt.idx differ diff --git a/tests/data/space_index_unsized/process_split_node.wt.idx b/tests/data/space_index_unsized/process_split_node.wt.idx index 73dfc5c8..d8c0b3c2 100644 Binary files a/tests/data/space_index_unsized/process_split_node.wt.idx and b/tests/data/space_index_unsized/process_split_node.wt.idx differ diff --git a/tests/data/test_persist/another_idx.wt.idx b/tests/data/test_persist/another_idx.wt.idx index 39de0d8d..f311be42 100644 Binary files a/tests/data/test_persist/another_idx.wt.idx and b/tests/data/test_persist/another_idx.wt.idx differ diff --git a/tests/data/test_persist/primary.wt.idx b/tests/data/test_persist/primary.wt.idx index 3e773971..8165c034 100644 Binary files a/tests/data/test_persist/primary.wt.idx and b/tests/data/test_persist/primary.wt.idx differ diff --git a/tests/data/test_without_secondary_indexes/primary.wt.idx b/tests/data/test_without_secondary_indexes/primary.wt.idx index d1ebdfb8..996835bd 100644 Binary files a/tests/data/test_without_secondary_indexes/primary.wt.idx and b/tests/data/test_without_secondary_indexes/primary.wt.idx differ diff --git a/tests/persistence/index_page/read.rs b/tests/persistence/index_page/read.rs index 3b39c55e..c1d60ccb 100644 --- a/tests/persistence/index_page/read.rs +++ b/tests/persistence/index_page/read.rs @@ -13,7 +13,7 @@ async fn test_index_page_read_in_space() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 99); + assert_eq!(page.inner.node_id.key, 99); assert_eq!(page.inner.current_index, 99); assert_eq!(page.inner.current_length, 99); } @@ -30,7 +30,7 @@ async fn test_index_page_read_after_create_node_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 5); + assert_eq!(page.inner.node_id.key, 5); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -50,7 +50,7 @@ async fn test_index_page_read_after_insert_at_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 5); + assert_eq!(page.inner.node_id.key, 5); assert_eq!(page.inner.current_index, 2); assert_eq!(page.inner.current_length, 2); assert_eq!(page.inner.slots.first().unwrap(), &1); @@ -74,7 +74,7 @@ async fn test_index_page_read_after_insert_at_with_node_id_update_in_space_index let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 7); + assert_eq!(page.inner.node_id.key, 7); assert_eq!(page.inner.current_index, 2); assert_eq!(page.inner.current_length, 2); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -98,7 +98,7 @@ async fn test_index_page_read_after_remove_at_node_id_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 3); + assert_eq!(page.inner.node_id.key, 3); assert_eq!(page.inner.current_index, 0); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &1); @@ -119,7 +119,7 @@ async fn test_index_page_read_after_insert_at_removed_place_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 7); + assert_eq!(page.inner.node_id.key, 7); assert_eq!(page.inner.current_index, 3); assert_eq!(page.inner.current_length, 3); assert_eq!(page.inner.slots.first().unwrap(), &1); @@ -148,7 +148,7 @@ async fn test_index_pages_read_after_creation_of_second_node_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 5); + assert_eq!(page.inner.node_id.key, 5); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -158,7 +158,7 @@ async fn test_index_pages_read_after_creation_of_second_node_in_space_index() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) .await .unwrap(); - assert_eq!(page.inner.node_id, 15); + assert_eq!(page.inner.node_id.key, 15); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -178,7 +178,7 @@ async fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 10); + assert_eq!(page.inner.node_id.key, 10); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -188,7 +188,7 @@ async fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) .await .unwrap(); - assert_eq!(page.inner.node_id, 15); + assert_eq!(page.inner.node_id.key, 15); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); assert_eq!(page.inner.slots.first().unwrap(), &0); @@ -208,10 +208,9 @@ async fn test_index_pages_read_full_page() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 1000); - assert_eq!(page.inner.current_index, 907); - assert_eq!(page.inner.current_length, 907); - assert_eq!(page.inner.size, page.inner.current_index); + assert_eq!(page.inner.node_id.key, 1000); + assert_eq!(page.inner.current_index, 905); + assert_eq!(page.inner.current_length, 905); } #[tokio::test] @@ -226,14 +225,14 @@ async fn test_index_pages_read_after_node_split() { let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, 457); - assert_eq!(page.inner.current_index, 1); + assert_eq!(page.inner.node_id.key, 457); + assert_eq!(page.inner.current_index, 0); assert_eq!(page.inner.current_length, 453); let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) .await .unwrap(); - assert_eq!(page.inner.node_id, 1000); - assert_eq!(page.inner.current_index, 454); - assert_eq!(page.inner.current_length, 454); + assert_eq!(page.inner.node_id.key, 1000); + assert_eq!(page.inner.current_index, 453); + assert_eq!(page.inner.current_length, 452); } diff --git a/tests/persistence/index_page/unsized_read.rs b/tests/persistence/index_page/unsized_read.rs index d0ee2263..3ed7c2c0 100644 --- a/tests/persistence/index_page/unsized_read.rs +++ b/tests/persistence/index_page/unsized_read.rs @@ -17,7 +17,7 @@ async fn test_index_page_read_after_create_node_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone".to_string()); + assert_eq!(page.inner.node_id.key, "Something from someone".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Something from someone".to_string()); @@ -47,7 +47,7 @@ async fn test_index_pages_read_after_creation_of_second_node_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone".to_string()); + assert_eq!(page.inner.node_id.key, "Something from someone".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Something from someone".to_string()); @@ -67,7 +67,7 @@ async fn test_index_pages_read_after_creation_of_second_node_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Someone from somewhere".to_string()); + assert_eq!(page.inner.node_id.key, "Someone from somewhere".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Someone from somewhere".to_string()); @@ -97,7 +97,7 @@ async fn test_index_pages_read_after_remove_node_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Someone from somewhere".to_string()); + assert_eq!(page.inner.node_id.key, "Someone from somewhere".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Someone from somewhere".to_string()); @@ -127,7 +127,7 @@ async fn test_index_pages_read_after_insert_at_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone".to_string()); + assert_eq!(page.inner.node_id.key, "Something from someone".to_string()); assert_eq!(page.inner.index_values.len(), 2); let first_value = &page.inner.index_values[0]; assert_eq!(first_value.key, "Something else".to_string()); @@ -167,7 +167,7 @@ async fn test_index_page_read_after_remove_at_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone".to_string()); + assert_eq!(page.inner.node_id.key, "Something from someone".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Something from someone".to_string()); @@ -197,7 +197,7 @@ async fn test_index_page_read_after_remove_at_node_id_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something else".to_string()); + assert_eq!(page.inner.node_id.key, "Something else".to_string()); assert_eq!(page.inner.index_values.len(), 1); let value = page.inner.index_values.first().unwrap(); assert_eq!(value.key, "Something else".to_string()); @@ -229,7 +229,10 @@ async fn test_index_page_read_after_insert_at_with_node_id_update_in_space_index .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone 1".to_string()); + assert_eq!( + page.inner.node_id.key, + "Something from someone 1".to_string() + ); assert_eq!(page.inner.index_values.len(), 2); let first_value = &page.inner.index_values[0]; assert_eq!(first_value.key, "Something from someone".to_string()); @@ -269,7 +272,10 @@ async fn test_index_page_read_after_insert_at_removed_place_in_space_index() { .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone 1".to_string()); + assert_eq!( + page.inner.node_id.key, + "Something from someone 1".to_string() + ); assert_eq!(page.inner.index_values.len(), 3); let first_value = &page.inner.index_values[0]; assert_eq!(first_value.key, "Something else".to_string()); @@ -318,7 +324,7 @@ async fn test_index_pages_read_after_node_split() { >(&mut file, 2) .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone 52"); + assert_eq!(page.inner.node_id.key, "Something from someone 52"); assert_eq!(page.inner.slots_size, 53); let page = parse_page::< @@ -327,6 +333,6 @@ async fn test_index_pages_read_after_node_split() { >(&mut file, 3) .await .unwrap(); - assert_eq!(page.inner.node_id, "Something from someone _100"); + assert_eq!(page.inner.node_id.key, "Something from someone _100"); assert_eq!(page.inner.slots_size, 48); } diff --git a/tests/persistence/read.rs b/tests/persistence/read.rs index 5c8a4944..34799469 100644 --- a/tests/persistence/read.rs +++ b/tests/persistence/read.rs @@ -45,7 +45,7 @@ async fn test_primary_index_parse() { assert_eq!(index.header.previous_id, 0.into()); assert_eq!(index.header.next_id, 0.into()); assert_eq!(index.header.page_type, PageType::Index); - assert_eq!(index.header.data_length, 16334); + assert_eq!(index.header.data_length, 16350); let mut key = 1; let length = 24; @@ -82,7 +82,7 @@ async fn test_another_idx_index_parse() { assert_eq!(index.header.previous_id, 0.into()); assert_eq!(index.header.next_id, 0.into()); assert_eq!(index.header.page_type, PageType::Index); - assert_eq!(index.header.data_length, 16334); + assert_eq!(index.header.data_length, 16350); let mut key = 1; let length = 24; @@ -110,10 +110,11 @@ async fn test_data_parse() { let mut file = File::open("tests/data/expected/test_persist/.wt.data") .await .unwrap(); - let data = - parse_data_page::<{ TEST_PERSIST_PAGE_SIZE }, { TEST_PERSIST_INNER_SIZE }>(&mut file, 1) - .await - .unwrap(); + let data = parse_data_page::<{ TEST_PERSIST_PAGE_SIZE as u32 }, { TEST_PERSIST_INNER_SIZE }>( + &mut file, 1, + ) + .await + .unwrap(); assert_eq!(data.header.space_id, 0.into()); assert_eq!(data.header.page_id, 1.into()); diff --git a/tests/persistence/space_index/unsized_write.rs b/tests/persistence/space_index/unsized_write.rs index 1b82863e..d5270972 100644 --- a/tests/persistence/space_index/unsized_write.rs +++ b/tests/persistence/space_index/unsized_write.rs @@ -544,7 +544,7 @@ async fn test_space_index_process_split_node() { key: "Something from someone _100".to_string(), value: Link { page_id: 0.into(), - offset: 0, + offset: 24, length: 24, }, }, diff --git a/tests/persistence/space_index/write.rs b/tests/persistence/space_index/write.rs index b8f14287..fcc03cd7 100644 --- a/tests/persistence/space_index/write.rs +++ b/tests/persistence/space_index/write.rs @@ -168,7 +168,7 @@ mod run_first { .await .unwrap(); - for i in (6..911).rev() { + for i in (6..909).rev() { space_index .process_change_event(ChangeEvent::InsertAt { max_value: Pair { @@ -529,7 +529,7 @@ async fn test_space_index_process_split_node() { key: 1000, value: Link { page_id: 0.into(), - offset: 0, + offset: 24, length: 24, }, }, diff --git a/tests/persistence/sync/string_primary_index.rs b/tests/persistence/sync/string_primary_index.rs index 6b150df6..db473fec 100644 --- a/tests/persistence/sync/string_primary_index.rs +++ b/tests/persistence/sync/string_primary_index.rs @@ -88,7 +88,7 @@ fn test_space_insert_many_sync() { let table = TestSyncWorkTable::load_from_file(config.clone()) .await .unwrap(); - for i in 0..20 { + for i in 0..1_000 { let pk = { let row = TestSyncRow { another: i, @@ -320,9 +320,16 @@ fn test_space_delete_sync() { id: "Some string before".to_string(), }; table.insert(row.clone()).unwrap(); - table.delete(row.id.clone().into()).await.unwrap(); + let another_row = TestSyncRow { + another: 43, + non_unique: 0, + field: 0.0, + id: "Some string".to_string(), + }; + table.insert(another_row.clone()).unwrap(); + table.delete(another_row.id.clone().into()).await.unwrap(); table.wait_for_ops().await; - row.id + another_row.id }; { let table = TestSyncWorkTable::load_from_file(config).await.unwrap(); diff --git a/tests/persistence/toc/read.rs b/tests/persistence/toc/read.rs index c95c321d..4b64fef3 100644 --- a/tests/persistence/toc/read.rs +++ b/tests/persistence/toc/read.rs @@ -1,6 +1,7 @@ -use data_bucket::INNER_PAGE_SIZE; use std::sync::atomic::AtomicU32; use std::sync::Arc; + +use data_bucket::{Link, INNER_PAGE_SIZE}; use tokio::fs::OpenOptions; use worktable::prelude::IndexTableOfContents; @@ -33,15 +34,24 @@ async fn test_index_table_of_contents_read_from_space() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(1)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u64, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, ) .await .unwrap(); - - assert_eq!(toc.get(&99), Some(2.into())) + assert_eq!( + toc.get(&( + 99, + Link { + page_id: 1.into(), + offset: 2352, + length: 24 + } + )), + Some(2.into()) + ) } #[tokio::test] @@ -53,7 +63,7 @@ async fn test_index_table_of_contents_read_from_space_index() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -61,7 +71,17 @@ async fn test_index_table_of_contents_read_from_space_index() { .await .unwrap(); - assert_eq!(toc.get(&5), Some(2.into())) + assert_eq!( + toc.get(&( + 5, + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), + Some(2.into()) + ) } #[tokio::test] @@ -73,7 +93,7 @@ async fn test_index_table_of_contents_read_from_space_index_after_insert() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -81,7 +101,17 @@ async fn test_index_table_of_contents_read_from_space_index_after_insert() { .await .unwrap(); - assert_eq!(toc.get(&5), Some(2.into())) + assert_eq!( + toc.get(&( + 5, + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), + Some(2.into()) + ) } #[tokio::test] @@ -93,7 +123,7 @@ async fn test_index_table_of_contents_read_from_space_index_with_updated_node_id .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -101,7 +131,17 @@ async fn test_index_table_of_contents_read_from_space_index_with_updated_node_id .await .unwrap(); - assert_eq!(toc.get(&7), Some(2.into())) + assert_eq!( + toc.get(&( + 7, + Link { + page_id: 0.into(), + offset: 24, + length: 48 + } + )), + Some(2.into()) + ) } #[tokio::test] @@ -113,7 +153,7 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_at_node_ .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -121,7 +161,17 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_at_node_ .await .unwrap(); - assert_eq!(toc.get(&3), Some(2.into())); + assert_eq!( + toc.get(&( + 3, + Link { + page_id: 0.into(), + offset: 24, + length: 48 + } + )), + Some(2.into()) + ); } #[tokio::test] @@ -133,7 +183,7 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_node() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -141,8 +191,28 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_node() { .await .unwrap(); - assert_eq!(toc.get(&5), None); - assert_eq!(toc.get(&15), Some(3.into())); + assert_eq!( + toc.get(&( + 5, + Link { + page_id: 1.into(), + offset: 0, + length: 24 + } + )), + None + ); + assert_eq!( + toc.get(&( + 15, + Link { + page_id: 1.into(), + offset: 0, + length: 24 + } + )), + Some(3.into()) + ); } #[tokio::test] @@ -154,7 +224,7 @@ async fn test_index_table_of_contents_read_from_space_index_with_create_node_aft .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -162,8 +232,28 @@ async fn test_index_table_of_contents_read_from_space_index_with_create_node_aft .await .unwrap(); - assert_eq!(toc.get(&10), Some(2.into())); - assert_eq!(toc.get(&15), Some(3.into())); + assert_eq!( + toc.get(&( + 10, + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), + Some(2.into()) + ); + assert_eq!( + toc.get(&( + 15, + Link { + page_id: 1.into(), + offset: 0, + length: 24 + } + )), + Some(3.into()) + ); } #[tokio::test] @@ -175,7 +265,7 @@ async fn test_index_table_of_contents_read_from_space_index_after_split_node() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(u32, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -183,6 +273,26 @@ async fn test_index_table_of_contents_read_from_space_index_after_split_node() { .await .unwrap(); - assert_eq!(toc.get(&1000), Some(3.into())); - assert_eq!(toc.get(&457), Some(2.into())); + assert_eq!( + toc.get(&( + 1000, + Link { + page_id: 0.into(), + offset: 24, + length: 24 + } + )), + Some(3.into()) + ); + assert_eq!( + toc.get(&( + 457, + Link { + page_id: 0.into(), + offset: 10968, + length: 24 + } + )), + Some(2.into()) + ); } diff --git a/tests/persistence/toc/unsized_read.rs b/tests/persistence/toc/unsized_read.rs index 0eb84369..ea64c052 100644 --- a/tests/persistence/toc/unsized_read.rs +++ b/tests/persistence/toc/unsized_read.rs @@ -1,7 +1,7 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; -use data_bucket::INNER_PAGE_SIZE; +use data_bucket::{Link, INNER_PAGE_SIZE}; use tokio::fs::OpenOptions; use worktable::prelude::IndexTableOfContents; @@ -14,7 +14,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -23,7 +23,14 @@ async fn test_index_table_of_contents_read_from_space_index_unsized() { .unwrap(); assert_eq!( - toc.get(&"Something from someone".to_string()), + toc.get(&( + "Something from someone".to_string(), + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), Some(2.into()) ) } @@ -37,7 +44,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_with_two_nod .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(3)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -46,11 +53,25 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_with_two_nod .unwrap(); assert_eq!( - toc.get(&"Something from someone".to_string()), + toc.get(&( + "Something from someone".to_string(), + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), Some(2.into()) ); assert_eq!( - toc.get(&"Someone from somewhere".to_string()), + toc.get(&( + "Someone from somewhere".to_string(), + Link { + page_id: 1.into(), + offset: 24, + length: 32 + } + )), Some(3.into()) ) } @@ -64,7 +85,7 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_node() { .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -72,9 +93,26 @@ async fn test_index_table_of_contents_read_from_space_index_with_remove_node() { .await .unwrap(); - assert_eq!(toc.get(&"Something from someone".to_string()), None); assert_eq!( - toc.get(&"Someone from somewhere".to_string()), + toc.get(&( + "Someone for someone".to_string(), + Link { + page_id: 1.into(), + offset: 24, + length: 32 + } + )), + None + ); + assert_eq!( + toc.get(&( + "Someone from somewhere".to_string(), + Link { + page_id: 1.into(), + offset: 24, + length: 32 + } + )), Some(3.into()) ); } @@ -88,7 +126,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_insert .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -97,7 +135,14 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_insert .unwrap(); assert_eq!( - toc.get(&"Something from someone".to_string()), + toc.get(&( + "Something from someone".to_string(), + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), Some(2.into()) ) } @@ -111,7 +156,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_remove .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -120,7 +165,14 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_remove .unwrap(); assert_eq!( - toc.get(&"Something from someone".to_string()), + toc.get(&( + "Something from someone".to_string(), + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), Some(2.into()) ) } @@ -134,7 +186,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_remove .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -142,7 +194,17 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_remove .await .unwrap(); - assert_eq!(toc.get(&"Something else".to_string()), Some(2.into())) + assert_eq!( + toc.get(&( + "Something else".to_string(), + Link { + page_id: 0.into(), + offset: 24, + length: 48 + } + )), + Some(2.into()) + ) } #[tokio::test] @@ -155,7 +217,7 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_create .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); - let toc = IndexTableOfContents::::parse_from_file( + let toc = IndexTableOfContents::<(String, Link), { INNER_PAGE_SIZE as u32 }>::parse_from_file( &mut file, 0.into(), next_id_gen, @@ -164,8 +226,25 @@ async fn test_index_table_of_contents_read_from_space_index_unsized_after_create .unwrap(); assert_eq!( - toc.get(&"Someone from somewhere".to_string()), + toc.get(&( + "Someone from somewhere".to_string(), + Link { + page_id: 1.into(), + offset: 24, + length: 32 + } + )), Some(3.into()) ); - assert_eq!(toc.get(&"Something else".to_string()), Some(2.into())); + assert_eq!( + toc.get(&( + "Something else".to_string(), + Link { + page_id: 0.into(), + offset: 0, + length: 24 + } + )), + Some(2.into()) + ); } diff --git a/tests/worktable/count.rs b/tests/worktable/count.rs index 41ab453e..4b98542c 100644 --- a/tests/worktable/count.rs +++ b/tests/worktable/count.rs @@ -59,7 +59,7 @@ async fn count() { }; // Count WT with 0 rows - assert_eq!(None, test_table.count()); + assert_eq!(0, test_table.count()); let _ = test_table.insert(row1); let _ = test_table.insert(row2); @@ -67,5 +67,5 @@ async fn count() { let _ = test_table.insert(row4); // Count by WT - assert_eq!(Some(4), test_table.count()); + assert_eq!(4, test_table.count()); }