diff --git a/Cargo.toml b/Cargo.toml index 0afa0753..2360bd3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.6.8" +version = "0.6.9" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -26,14 +26,14 @@ worktable_codegen = { path = "codegen", version = "0.6.7" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -data_bucket = "0.2.7" -# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } -# data_bucket = { path = "../DataBucket", version = "0.2.6" } +data_bucket = "0.2.8" +# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } +# data_bucket = { path = "../DataBucket", version = "0.2.7" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } -indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } -# indexset = { path = "../indexset", version = "0.12.2", features = ["concurrent", "cdc", "multimap"] } -# indexset = { git = "https://github.com/Handy-caT/indexset", branch = "multimap-range-fix", version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } +# indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } +# indexset = { path = "../indexset", version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } +indexset = { package = "wt-indexset", version = "0.12.4", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" ordered-float = "5.0.0" parking_lot = "0.12.3" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index b30841e9..6d113605 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.6.7" +version = "0.6.9" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/codegen/src/persist_index/space/events.rs b/codegen/src/persist_index/space/events.rs new file mode 100644 index 00000000..b99f68ec --- /dev/null +++ b/codegen/src/persist_index/space/events.rs @@ -0,0 +1,369 @@ +use convert_case::{Case, Casing}; +use proc_macro2::TokenStream; +use quote::quote; + +use crate::name_generator::WorktableNameGenerator; +use crate::persist_index::generator::Generator; + +impl Generator { + pub fn gen_space_secondary_index_events_type(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + + let fields: Vec<_> = self + .field_types + .iter() + .map(|(i, t)| { + quote! { + #i: Vec + >>, + } + }) + .collect(); + + quote! { + #[derive(Clone, Debug, Default)] + pub struct #ident { + #(#fields)* + } + } + } + + pub fn gen_space_secondary_index_events_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let extend_fn = self.gen_space_secondary_index_events_extend_fn(); + let remove_fn = self.gen_space_secondary_index_events_remove_fn(); + let last_evs_fn = self.gen_space_secondary_index_events_last_evs_fn(); + let first_evs = self.gen_space_secondary_index_events_first_evs_fn(); + let iter_event_ids_fn = self.gen_space_secondary_index_events_iter_event_ids_fn(); + let contains_event_fn = self.gen_space_secondary_index_events_contains_event_fn(); + let split_is_first_fn = self.gen_space_secondary_index_events_is_first_split_fn(); + let sort_fn = self.gen_space_secondary_index_events_sort_fn(); + let validate_fn = self.gen_space_secondary_index_events_validate_fn(); + let is_empty_fn = self.gen_space_secondary_index_events_is_empty_fn(); + let is_unit_fn = self.gen_space_secondary_index_events_is_unit_fn(); + + quote! { + impl TableSecondaryIndexEventsOps<#avt_index_ident> for #ident { + #extend_fn + #remove_fn + #last_evs_fn + #first_evs + #iter_event_ids_fn + #contains_event_fn + #split_is_first_fn + #sort_fn + #validate_fn + #is_empty_fn + #is_unit_fn + } + } + } + + fn gen_space_secondary_index_events_sort_fn(&self) -> TokenStream { + let fields_sort: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + self.#i.sort_by(|ev1, ev2| ev1.id().cmp(&ev2.id())); + } + }) + .collect(); + + quote! { + fn sort(&mut self) { + #(#fields_sort)* + } + } + } + + fn gen_space_secondary_index_events_first_evs_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let fields_first: Vec<_> = self + .field_types + .keys() + .map(|i| { + let camel_case_name = i.to_string().from_case(Case::Snake).to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + quote! { + let first = self.#i.first().map(|ev| ev.id()); + map.insert(#avt_index_ident::#index_variant, first); + } + }) + .collect(); + + quote! { + fn first_evs(&self) -> std::collections::HashMap<#avt_index_ident, Option> { + let mut map = std::collections::HashMap::new(); + #(#fields_first)* + map + } + } + } + + fn gen_space_secondary_index_events_last_evs_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let fields_last: Vec<_> = self + .field_types + .keys() + .map(|i| { + let camel_case_name = i.to_string().from_case(Case::Snake).to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + quote! { + let last = self.#i.last().map(|ev| ev.id()); + map.insert(#avt_index_ident::#index_variant, last); + } + }) + .collect(); + + quote! { + fn last_evs(&self) -> std::collections::HashMap<#avt_index_ident, Option> { + let mut map = std::collections::HashMap::new(); + #(#fields_last)* + map + } + } + } + + fn gen_space_secondary_index_events_extend_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + + let fields_extend: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + self.#i.extend(another.#i); + } + }) + .collect(); + + quote! { + fn extend(&mut self, another: #ident) { + #(#fields_extend)* + } + } + } + + fn gen_space_secondary_index_events_remove_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + + let fields_remove: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + for ev in &another.#i { + if let Ok(pos) = self.#i + .binary_search_by(|inner_ev| inner_ev.id().cmp(&ev.id())) { + self.#i.remove(pos); + } + + } + } + }) + .collect(); + + quote! { + fn remove(&mut self, another: &#ident) { + #(#fields_remove)* + } + } + } + + fn gen_space_secondary_index_events_iter_event_ids_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let fields_iter: Vec<_> = self + .field_types + .keys() + .map(|i| { + let camel_case_name = i.to_string().from_case(Case::Snake).to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + quote! { + self.#i.iter().map(|ev| (#avt_index_ident::#index_variant, ev.id())).collect::>() + } + }) + .collect(); + + quote! { + fn iter_event_ids(&self) -> impl Iterator { + > as Iterator>::flatten( + vec![ + #(#fields_iter),* + ] + .into_iter() + ) + } + } + } + + fn gen_space_secondary_index_events_is_first_split_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let fields_matches: Vec<_> = self + .field_types + .keys() + .map(|i| { + let camel_case_name = i.to_string().from_case(Case::Snake).to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + quote! { + #avt_index_ident::#index_variant => { + if let Some(first) = self.#i.first() { + match first { + IndexChangeEvent::SplitNode {..} => { + true + } + _ => { + false + } + } + } else { + false + } + } + } + }) + .collect(); + + if fields_matches.is_empty() { + quote! { + fn is_first_ev_is_split(&self, _index: #avt_index_ident) -> bool { + false + } + } + } else { + quote! { + fn is_first_ev_is_split(&self, index: #avt_index_ident) -> bool { + match index { + #(#fields_matches),* + } + } + } + } + } + + fn gen_space_secondary_index_events_contains_event_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let fields_matches: Vec<_> = self + .field_types + .keys() + .map(|i| { + let camel_case_name = i.to_string().from_case(Case::Snake).to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + quote! { + #avt_index_ident::#index_variant => { + self.#i.iter().map(|ev| ev.id()).any(|ev_id| ev_id == id) + } + } + }) + .collect(); + + if fields_matches.is_empty() { + quote! { + fn contains_event(&self, index: #avt_index_ident, id: IndexChangeEventId) -> bool { + false + } + } + } else { + quote! { + fn contains_event(&self, index: #avt_index_ident, id: IndexChangeEventId) -> bool { + match index { + #(#fields_matches),* + } + } + } + } + } + + fn gen_space_secondary_index_events_validate_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); + let ident = name_generator.get_space_secondary_index_events_ident(); + + let fields_validate: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + let #i = validate_events(&mut self.#i); + } + }) + .collect(); + let fields_init: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + #i, + } + }) + .collect(); + + quote! { + fn validate(&mut self) -> #ident { + #(#fields_validate)* + Self { + #(#fields_init)* + } + } + } + } + + fn gen_space_secondary_index_events_is_empty_fn(&self) -> TokenStream { + let is_empty: Vec<_> = self + .field_types + .keys() + .map(|i| { + quote! { + self.#i.is_empty() + } + }) + .collect(); + if is_empty.is_empty() { + quote! { + fn is_empty(&self) -> bool { + true + } + } + } else { + quote! { + fn is_empty(&self) -> bool { + #(#is_empty) &&* + } + } + } + } + + fn gen_space_secondary_index_events_is_unit_fn(&self) -> TokenStream { + let is_unit = if self.field_types.is_empty() { + quote! { + true + } + } else { + quote! { + false + } + }; + + quote! { + fn is_unit() -> bool { + #is_unit + } + } + } +} diff --git a/codegen/src/persist_index/space.rs b/codegen/src/persist_index/space/index.rs similarity index 68% rename from codegen/src/persist_index/space.rs rename to codegen/src/persist_index/space/index.rs index d3043bf8..4c3559ed 100644 --- a/codegen/src/persist_index/space.rs +++ b/codegen/src/persist_index/space/index.rs @@ -5,68 +5,7 @@ use crate::name_generator::{is_unsized, WorktableNameGenerator}; use crate::persist_index::generator::Generator; impl Generator { - pub fn gen_space_index(&self) -> TokenStream { - let secondary_index = self.gen_space_secondary_index_type(); - let secondary_impl = self.gen_space_secondary_index_impl_space_index(); - let secondary_index_events = self.gen_space_secondary_index_events_type(); - let secondary_index_events_impl = self.gen_space_secondary_index_events_impl(); - - quote! { - #secondary_index_events - #secondary_index_events_impl - #secondary_index - #secondary_impl - } - } - - fn gen_space_secondary_index_events_type(&self) -> TokenStream { - let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); - let ident = name_generator.get_space_secondary_index_events_ident(); - - let fields: Vec<_> = self - .field_types - .iter() - .map(|(i, t)| { - quote! { - #i: Vec - >>, - } - }) - .collect(); - - quote! { - #[derive(Clone, Debug, Default)] - pub struct #ident { - #(#fields)* - } - } - } - - fn gen_space_secondary_index_events_impl(&self) -> TokenStream { - let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); - let ident = name_generator.get_space_secondary_index_events_ident(); - - let fields: Vec<_> = self - .field_types - .keys() - .map(|i| { - quote! { - self.#i.extend(another.#i); - } - }) - .collect(); - - quote! { - impl TableSecondaryIndexEventsOps for #ident { - fn extend(&mut self, another: #ident) { - #(#fields)* - } - } - } - } - - fn gen_space_secondary_index_type(&self) -> TokenStream { + pub fn gen_space_secondary_index_type(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); let ident = name_generator.get_space_secondary_index_ident(); let inner_const_name = name_generator.get_page_inner_size_const_ident(); @@ -95,7 +34,7 @@ impl Generator { } } - fn gen_space_secondary_index_impl_space_index(&self) -> TokenStream { + pub fn gen_space_secondary_index_impl_space_index(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident); let events_ident = name_generator.get_space_secondary_index_events_ident(); let ident = name_generator.get_space_secondary_index_ident(); diff --git a/codegen/src/persist_index/space/mod.rs b/codegen/src/persist_index/space/mod.rs new file mode 100644 index 00000000..240fa794 --- /dev/null +++ b/codegen/src/persist_index/space/mod.rs @@ -0,0 +1,23 @@ +mod events; +mod index; + +use proc_macro2::TokenStream; +use quote::quote; + +use crate::persist_index::generator::Generator; + +impl Generator { + pub fn gen_space_index(&self) -> TokenStream { + let secondary_index = self.gen_space_secondary_index_type(); + let secondary_impl = self.gen_space_secondary_index_impl_space_index(); + let secondary_index_events = self.gen_space_secondary_index_events_type(); + let secondary_index_events_impl = self.gen_space_secondary_index_events_impl(); + + quote! { + #secondary_index_events + #secondary_index_events_impl + #secondary_index + #secondary_impl + } + } +} diff --git a/codegen/src/persist_table/generator/space.rs b/codegen/src/persist_table/generator/space.rs index e2517602..da76a093 100644 --- a/codegen/src/persist_table/generator/space.rs +++ b/codegen/src/persist_table/generator/space.rs @@ -10,12 +10,14 @@ impl Generator { let primary_key_type = name_generator.get_primary_key_type_ident(); let space_secondary_indexes_events = name_generator.get_space_secondary_index_events_ident(); + let avt_index_ident = name_generator.get_available_indexes_ident(); quote! { pub type #ident = PersistenceTask< <<#primary_key_type as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State, #primary_key_type, #space_secondary_indexes_events, + #avt_index_ident, >; } } @@ -29,6 +31,7 @@ impl Generator { let space_secondary_indexes = name_generator.get_space_secondary_index_ident(); let space_secondary_indexes_events = name_generator.get_space_secondary_index_events_ident(); + let avt_index_ident = name_generator.get_available_indexes_ident(); let space_index_type = if self.attributes.pk_unsized { quote! { SpaceIndexUnsized<#primary_key_type, { #inner_const_name as u32 }>, @@ -50,6 +53,7 @@ impl Generator { #space_secondary_indexes, #primary_key_type, #space_secondary_indexes_events, + #avt_index_ident, >; } } diff --git a/codegen/src/worktable/generator/index/info.rs b/codegen/src/worktable/generator/index/info.rs new file mode 100644 index 00000000..10da64cd --- /dev/null +++ b/codegen/src/worktable/generator/index/info.rs @@ -0,0 +1,91 @@ +use proc_macro2::TokenStream; +use quote::quote; + +use crate::name_generator::WorktableNameGenerator; +use crate::worktable::generator::Generator; + +impl Generator { + pub fn gen_secondary_index_info_impl_def(&mut self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let index_type_ident = name_generator.get_index_type_ident(); + + let info_fn = self.gen_index_info_fn(); + let is_empty_fn = self.gen_index_is_empty_fn(); + + quote! { + impl TableSecondaryIndexInfo for #index_type_ident { + #info_fn + #is_empty_fn + } + } + } + + fn gen_index_info_fn(&self) -> TokenStream { + let rows = self.columns.indexes.values().map(|idx| { + let index_field_name = &idx.name; + let index_name_str = index_field_name.to_string(); + + if idx.is_unique { + quote! { + info.push(IndexInfo { + name: #index_name_str.to_string(), + index_type: IndexKind::Unique, + key_count: self.#index_field_name.len(), + capacity: self.#index_field_name.capacity(), + heap_size: self.#index_field_name.heap_size(), + used_size: self.#index_field_name.used_size(), + node_count: self.#index_field_name.node_count(), + }); + } + } else { + quote! { + info.push(IndexInfo { + name: #index_name_str.to_string(), + index_type: IndexKind::NonUnique, + key_count: self.#index_field_name.len(), + capacity: self.#index_field_name.capacity(), + heap_size: self.#index_field_name.heap_size(), + used_size: self.#index_field_name.used_size(), + node_count: self.#index_field_name.node_count(), + }); + } + } + }); + + quote! { + fn index_info(&self) -> Vec { + let mut info = Vec::new(); + #(#rows)* + info + } + } + } + + fn gen_index_is_empty_fn(&self) -> TokenStream { + let is_empty = self + .columns + .indexes + .values() + .map(|idx| { + let index_field_name = &idx.name; + quote! { + self.#index_field_name.len() == 0 + } + }) + .collect::>(); + + if is_empty.is_empty() { + quote! { + fn is_empty(&self) -> bool { + true + } + } + } else { + quote! { + fn is_empty(&self) -> bool { + #(#is_empty) &&* + } + } + } + } +} diff --git a/codegen/src/worktable/generator/index/mod.rs b/codegen/src/worktable/generator/index/mod.rs index 9280c3c4..199b073a 100644 --- a/codegen/src/worktable/generator/index/mod.rs +++ b/codegen/src/worktable/generator/index/mod.rs @@ -1,4 +1,5 @@ mod cdc; +mod info; mod usual; use crate::name_generator::{is_float, is_unsized, WorktableNameGenerator}; @@ -12,6 +13,7 @@ impl Generator { pub fn gen_index_def(&mut self) -> TokenStream { let type_def = self.gen_type_def(); let impl_def = self.gen_secondary_index_impl_def(); + let info_def = self.gen_secondary_index_info_impl_def(); let cdc_impl_def = if self.is_persist { self.gen_secondary_index_cdc_impl_def() } else { @@ -23,6 +25,7 @@ impl Generator { quote! { #type_def #impl_def + #info_def #cdc_impl_def #default_impl #available_indexes @@ -155,7 +158,7 @@ impl Generator { } } else { quote! { - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Hash, Eq)] pub enum #avt_type_ident { #(#indexes)* } diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index a9cfbc0b..9e2e2e2e 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -17,7 +17,6 @@ impl Generator { let save_row_fn = self.gen_save_row_index_fn(); let delete_row_fn = self.gen_delete_row_index_fn(); let process_difference_fn = self.gen_process_difference_index_fn(); - let info_fn = self.gen_index_info_fn(); let delete_from_indexes = self.gen_index_delete_from_indexes_fn(); quote! { @@ -26,7 +25,6 @@ impl Generator { #delete_row_fn #process_difference_fn #delete_from_indexes - #info_fn } } } @@ -265,45 +263,4 @@ impl Generator { } } } - - fn gen_index_info_fn(&self) -> TokenStream { - let rows = self.columns.indexes.values().map(|idx| { - let index_field_name = &idx.name; - let index_name_str = index_field_name.to_string(); - - if idx.is_unique { - quote! { - info.push(IndexInfo { - name: #index_name_str.to_string(), - index_type: IndexKind::Unique, - key_count: self.#index_field_name.len(), - capacity: self.#index_field_name.capacity(), - heap_size: self.#index_field_name.heap_size(), - used_size: self.#index_field_name.used_size(), - node_count: self.#index_field_name.node_count(), - }); - } - } else { - quote! { - info.push(IndexInfo { - name: #index_name_str.to_string(), - index_type: IndexKind::NonUnique, - key_count: self.#index_field_name.len(), - capacity: self.#index_field_name.capacity(), - heap_size: self.#index_field_name.heap_size(), - used_size: self.#index_field_name.used_size(), - node_count: self.#index_field_name.node_count(), - }); - } - } - }); - - quote! { - fn index_info(&self) -> Vec { - let mut info = Vec::new(); - #(#rows)* - info - } - } - } } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 8c9cef80..fa09421c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] - +eyre = "0.6.12" +worktable = { path = "..", version = "0.6.8" } #worktable = { path = "..", version = "0.5.3", features = ["perf_measurements"] } -#rkyv = { version = "0.8.9", features = ["uuid-1"] } +rkyv = { version = "0.8.9", features = ["uuid-1"] } #lockfree = "0.5.1" #derive_more = { version = "1.0.0", features = ["full"] } #eyre = "0.6.12" @@ -16,4 +17,14 @@ edition = "2021" #ordered-float = "5.0.0" #indexset = { version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } #tokio = { version = "1", features = ["full"] } - +tokio = { version = "1", features = ["full"] } +rand = "0.8" +chrono = { version = "0.4", features = ["std"] } +futures = "0.3.30" +uuid = { version = "1.10.0", features = ["v4", "v7"] } +derive_more = { version = "1.0.0", features = [ + "from", + "error", + "display", + "into", +] } diff --git a/src/index/mod.rs b/src/index/mod.rs index 17dc202c..d6acdd04 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,17 +1,16 @@ mod multipair; mod table_index; -mod table_index_cdc; mod table_secondary_index; -mod table_secondary_index_events; mod unsized_node; pub use indexset::concurrent::map::BTreeMap as IndexMap; pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap; pub use multipair::MultiPairRecreate; -pub use table_index::TableIndex; -pub use table_index_cdc::TableIndexCdc; -pub use table_secondary_index::{IndexError, TableSecondaryIndex, TableSecondaryIndexCdc}; -pub use table_secondary_index_events::TableSecondaryIndexEventsOps; +pub use table_index::{TableIndex, TableIndexCdc}; +pub use table_secondary_index::{ + IndexError, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, + TableSecondaryIndexInfo, +}; pub use unsized_node::UnsizedNode; #[derive(Debug)] diff --git a/src/index/table_index_cdc.rs b/src/index/table_index/cdc.rs similarity index 93% rename from src/index/table_index_cdc.rs rename to src/index/table_index/cdc.rs index af1f6191..550ad0d4 100644 --- a/src/index/table_index_cdc.rs +++ b/src/index/table_index/cdc.rs @@ -1,12 +1,14 @@ +use std::fmt::Debug; use std::hash::Hash; -use crate::{IndexMap, IndexMultiMap}; use data_bucket::Link; use indexset::cdc::change::ChangeEvent; use indexset::core::multipair::MultiPair; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; +use crate::{IndexMap, IndexMultiMap}; + pub trait TableIndexCdc { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>); #[allow(clippy::type_complexity)] @@ -19,7 +21,7 @@ pub trait TableIndexCdc { impl TableIndexCdc for IndexMultiMap where - T: Eq + Hash + Clone + Send + Ord, + T: Debug + Eq + Hash + Clone + Send + Ord, Node: NodeLike> + Send + 'static, { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>) { @@ -39,7 +41,7 @@ where impl TableIndexCdc for IndexMap where - T: Eq + Hash + Clone + Send + Ord, + T: Debug + Eq + Hash + Clone + Send + Ord, Node: NodeLike> + Send + 'static, { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>) { diff --git a/src/index/table_index.rs b/src/index/table_index/mod.rs similarity index 86% rename from src/index/table_index.rs rename to src/index/table_index/mod.rs index b2d1121e..b3b4e9fe 100644 --- a/src/index/table_index.rs +++ b/src/index/table_index/mod.rs @@ -1,11 +1,17 @@ +use std::fmt::Debug; use std::hash::Hash; -use crate::{IndexMap, IndexMultiMap}; use data_bucket::Link; use indexset::core::multipair::MultiPair; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; +use crate::{IndexMap, IndexMultiMap}; + +mod cdc; + +pub use cdc::TableIndexCdc; + pub trait TableIndex { fn insert(&self, value: T, link: Link) -> Option; fn remove(&self, value: T, link: Link) -> Option<(T, Link)>; @@ -13,7 +19,7 @@ pub trait TableIndex { impl TableIndex for IndexMultiMap where - T: Eq + Hash + Clone + Send + Ord, + T: Debug + Eq + Hash + Clone + Send + Ord, Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { @@ -27,7 +33,7 @@ where impl TableIndex for IndexMap where - T: Eq + Hash + Clone + Send + Ord, + T: Debug + Eq + Hash + Clone + Send + Ord, Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { diff --git a/src/index/table_secondary_index/cdc.rs b/src/index/table_secondary_index/cdc.rs new file mode 100644 index 00000000..5ba57714 --- /dev/null +++ b/src/index/table_secondary_index/cdc.rs @@ -0,0 +1,23 @@ +use std::collections::HashMap; + +use data_bucket::Link; + +use crate::{Difference, IndexError, WorkTableError}; + +pub trait TableSecondaryIndexCdc { + fn save_row_cdc( + &self, + row: Row, + link: Link, + ) -> Result>; + fn delete_row_cdc( + &self, + row: Row, + link: Link, + ) -> Result>; + fn process_difference_cdc( + &self, + link: Link, + differences: HashMap<&str, Difference>, + ) -> Result; +} diff --git a/src/index/table_secondary_index/index_events.rs b/src/index/table_secondary_index/index_events.rs new file mode 100644 index 00000000..9183da9b --- /dev/null +++ b/src/index/table_secondary_index/index_events.rs @@ -0,0 +1,28 @@ +use crate::prelude::IndexChangeEventId; +use indexset::cdc::change; +use std::collections::HashMap; + +pub trait TableSecondaryIndexEventsOps { + fn extend(&mut self, another: Self) + where + Self: Sized; + fn remove(&mut self, another: &Self) + where + Self: Sized; + fn last_evs(&self) -> HashMap>; + fn first_evs(&self) -> HashMap>; + // TODO: Remove this when indexset will be fully fixed.................... + fn is_first_ev_is_split(&self, _index: AvailableIndexes) -> bool { + false + } + fn iter_event_ids(&self) -> impl Iterator; + fn contains_event(&self, _index: AvailableIndexes, _id: change::Id) -> bool { + false + } + fn sort(&mut self); + fn validate(&mut self) -> Self + where + Self: Sized; + fn is_empty(&self) -> bool; + fn is_unit() -> bool; +} diff --git a/src/index/table_secondary_index/info.rs b/src/index/table_secondary_index/info.rs new file mode 100644 index 00000000..569a4953 --- /dev/null +++ b/src/index/table_secondary_index/info.rs @@ -0,0 +1,23 @@ +use crate::prelude::IndexInfo; + +pub trait TableSecondaryIndexInfo { + fn index_info(&self) -> Vec; + fn is_empty(&self) -> bool; + fn is_unit() -> bool { + false + } +} + +impl TableSecondaryIndexInfo for () { + fn index_info(&self) -> Vec { + vec![] + } + + fn is_empty(&self) -> bool { + true + } + + fn is_unit() -> bool { + true + } +} diff --git a/src/index/table_secondary_index.rs b/src/index/table_secondary_index/mod.rs similarity index 73% rename from src/index/table_secondary_index.rs rename to src/index/table_secondary_index/mod.rs index 0599080d..3b221a0f 100644 --- a/src/index/table_secondary_index.rs +++ b/src/index/table_secondary_index/mod.rs @@ -1,11 +1,18 @@ +mod cdc; +mod index_events; +mod info; + use std::collections::HashMap; use data_bucket::Link; -use crate::system_info::IndexInfo; use crate::Difference; use crate::WorkTableError; +pub use cdc::TableSecondaryIndexCdc; +pub use index_events::TableSecondaryIndexEventsOps; +pub use info::TableSecondaryIndexInfo; + pub trait TableSecondaryIndex { fn save_row(&self, row: Row, link: Link) -> Result<(), IndexError>; @@ -23,26 +30,6 @@ pub trait TableSecondaryIndex { link: Link, differences: HashMap<&str, Difference>, ) -> Result<(), WorkTableError>; - - fn index_info(&self) -> Vec; -} - -pub trait TableSecondaryIndexCdc { - fn save_row_cdc( - &self, - row: Row, - link: Link, - ) -> Result>; - fn delete_row_cdc( - &self, - row: Row, - link: Link, - ) -> Result>; - fn process_difference_cdc( - &self, - link: Link, - differences: HashMap<&str, Difference>, - ) -> Result; } impl @@ -75,10 +62,6 @@ where ) -> Result<(), WorkTableError> { Ok(()) } - - fn index_info(&self) -> Vec { - vec![] - } } #[derive(Debug)] diff --git a/src/index/table_secondary_index_events.rs b/src/index/table_secondary_index_events.rs deleted file mode 100644 index c4c4c72a..00000000 --- a/src/index/table_secondary_index_events.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub trait TableSecondaryIndexEventsOps { - fn extend(&mut self, another: Self) - where - Self: Sized; -} diff --git a/src/lib.rs b/src/lib.rs index a44c5165..2d8fdfca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,10 +26,10 @@ pub mod prelude { pub use crate::mem_stat::MemStat; pub use crate::persistence::{ map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general, - DeleteOperation, IndexTableOfContents, InsertOperation, Operation, OperationId, - PersistenceConfig, PersistenceEngine, PersistenceEngineOps, PersistenceTask, SpaceData, - SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, SpaceSecondaryIndexOps, - UpdateOperation, + validate_events, DeleteOperation, IndexTableOfContents, InsertOperation, Operation, + OperationId, PersistenceConfig, PersistenceEngine, PersistenceEngineOps, PersistenceTask, + SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, + SpaceSecondaryIndexOps, UpdateOperation, }; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{Order, QueryParams, SelectQueryBuilder, SelectQueryExecutor}; @@ -38,7 +38,8 @@ pub mod prelude { pub use crate::{ lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, MultiPairRecreate, TableIndex, TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, - TableSecondaryIndexEventsOps, UnsizedNode, WorkTable, WorkTableError, + TableSecondaryIndexEventsOps, TableSecondaryIndexInfo, UnsizedNode, WorkTable, + WorkTableError, }; pub use data_bucket::{ align, get_index_page_size_from_data_length, map_data_pages_to_general, parse_data_page, @@ -50,7 +51,7 @@ pub mod prelude { }; pub use derive_more::{From, Into}; pub use indexset::{ - cdc::change::ChangeEvent as IndexChangeEvent, + cdc::change::{ChangeEvent as IndexChangeEvent, Id as IndexChangeEventId}, core::{multipair::MultiPair as IndexMultiPair, pair::Pair as IndexPair}, }; pub use ordered_float::OrderedFloat; diff --git a/src/mem_stat/mod.rs b/src/mem_stat/mod.rs index 1ea05599..20a21329 100644 --- a/src/mem_stat/mod.rs +++ b/src/mem_stat/mod.rs @@ -1,6 +1,7 @@ mod primitives; use std::collections::HashMap; +use std::fmt::Debug; use std::rc::Rc; use std::sync::Arc; @@ -52,8 +53,8 @@ impl MemStat for String { impl MemStat for IndexMap where - K: Ord + Clone + 'static + MemStat + Send, - V: Clone + 'static + MemStat + Send, + K: Debug + Ord + Clone + 'static + MemStat + Send, + V: Debug + Clone + 'static + MemStat + Send, Node: NodeLike> + Send + 'static, { fn heap_size(&self) -> usize { @@ -83,8 +84,8 @@ where impl MemStat for IndexMultiMap where - K: Ord + Clone + 'static + MemStat + Send, - V: Ord + Clone + 'static + MemStat + Send, + K: Debug + Ord + Clone + 'static + MemStat + Send, + V: Debug + Ord + Clone + 'static + MemStat + Send, Node: NodeLike> + Send + 'static, { fn heap_size(&self) -> usize { diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index 1ad7e6a4..7fb230a8 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -9,6 +9,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use std::fmt::Debug; use std::fs; +use std::hash::Hash; use std::marker::PhantomData; use std::path::Path; @@ -19,6 +20,7 @@ pub struct PersistenceEngine< SpaceSecondaryIndexes, PrimaryKey, SecondaryIndexEvents, + AvailableIndexes, PrimaryKeyGenState = <::Generator as PrimaryKeyGeneratorState>::State, > where @@ -28,7 +30,7 @@ where pub data: SpaceData, pub primary_index: SpacePrimaryIndex, pub secondary_indexes: SpaceSecondaryIndexes, - phantom_data: PhantomData<(PrimaryKey, SecondaryIndexEvents, PrimaryKeyGenState)>, + phantom_data: PhantomData<(PrimaryKey, SecondaryIndexEvents, PrimaryKeyGenState, AvailableIndexes)>, } impl< @@ -37,6 +39,7 @@ impl< SpaceSecondaryIndexes, PrimaryKey, SecondaryIndexEvents, + AvailableIndexes, PrimaryKeyGenState, > PersistenceEngine< @@ -45,6 +48,7 @@ impl< SpaceSecondaryIndexes, PrimaryKey, SecondaryIndexEvents, + AvailableIndexes, PrimaryKeyGenState, > where @@ -77,14 +81,16 @@ impl< SpaceSecondaryIndexes, PrimaryKey, SecondaryIndexEvents, + AvailableIndexes, PrimaryKeyGenState, - > PersistenceEngineOps + > PersistenceEngineOps for PersistenceEngine< SpaceData, SpacePrimaryIndex, SpaceSecondaryIndexes, PrimaryKey, SecondaryIndexEvents, + AvailableIndexes, PrimaryKeyGenState, > where @@ -93,8 +99,10 @@ where SpaceData: SpaceDataOps + Send, SpacePrimaryIndex: SpaceIndexOps + Send, SpaceSecondaryIndexes: SpaceSecondaryIndexOps + Send, - SecondaryIndexEvents: Clone + Debug + Default + TableSecondaryIndexEventsOps + Send, + SecondaryIndexEvents: + Clone + Debug + Default + TableSecondaryIndexEventsOps + Send, PrimaryKeyGenState: Clone + Debug + Send, + AvailableIndexes: Clone + Copy + Debug + Eq + Hash + Send, { async fn apply_operation( &mut self, @@ -136,7 +144,12 @@ where async fn apply_batch_operation( &mut self, - batch_op: BatchOperation, + batch_op: BatchOperation< + PrimaryKeyGenState, + PrimaryKey, + SecondaryIndexEvents, + AvailableIndexes, + >, ) -> eyre::Result<()> { let batch_data_op = batch_op.get_batch_data_op()?; diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index e3bf7401..38eafe5d 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -1,6 +1,6 @@ mod engine; mod manager; -mod operation; +pub mod operation; mod space; mod task; @@ -8,7 +8,8 @@ use crate::persistence::operation::BatchOperation; pub use engine::PersistenceEngine; pub use manager::PersistenceConfig; pub use operation::{ - DeleteOperation, InsertOperation, Operation, OperationId, OperationType, UpdateOperation, + validate_events, DeleteOperation, InsertOperation, Operation, OperationId, OperationType, + UpdateOperation, }; pub use space::{ map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general, @@ -18,7 +19,13 @@ pub use space::{ use std::future::Future; pub use task::PersistenceTask; -pub trait PersistenceEngineOps { +pub trait PersistenceEngineOps< + PrimaryKeyGenState, + PrimaryKey, + SecondaryIndexEvents, + AvailableIndexes, +> +{ fn apply_operation( &mut self, op: Operation, @@ -26,6 +33,11 @@ pub trait PersistenceEngineOps, + batch_op: BatchOperation< + PrimaryKeyGenState, + PrimaryKey, + SecondaryIndexEvents, + AvailableIndexes, + >, ) -> impl Future> + Send; } diff --git a/src/persistence/operation.rs b/src/persistence/operation.rs deleted file mode 100644 index cc359cd6..00000000 --- a/src/persistence/operation.rs +++ /dev/null @@ -1,323 +0,0 @@ -use std::cmp::Ordering; -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::{Hash, Hasher}; - -use data_bucket::page::PageId; -use data_bucket::{Link, SizeMeasurable}; -use derive_more::Display; -use indexset::cdc::change::ChangeEvent; -use indexset::core::pair::Pair; -use rkyv::{Archive, Deserialize, Serialize}; -use uuid::Uuid; -use worktable_codegen::{worktable, MemStat}; - -use crate::persistence::space::{BatchChangeEvent, BatchData}; -use crate::persistence::task::QueueInnerRow; -use crate::prelude::*; -use crate::prelude::{From, Order, SelectQueryExecutor}; - -/// Represents page's identifier. Is unique within the table bounds -#[derive(Archive, Copy, Clone, Deserialize, Debug, Display, From, Serialize)] -#[rkyv(derive(Debug, PartialOrd, PartialEq, Eq, Ord))] -pub enum OperationId { - #[from] - Single(Uuid), - Multi(Uuid), -} - -impl OperationId { - fn get_id(&self) -> Uuid { - match self { - OperationId::Single(id) => *id, - OperationId::Multi(id) => *id, - } - } -} - -impl Hash for OperationId { - fn hash(&self, state: &mut H) { - Hash::hash(&self.get_id(), state) - } -} - -impl PartialEq for OperationId { - fn eq(&self, other: &Self) -> bool { - self.get_id().eq(&other.get_id()) - } -} - -impl Eq for OperationId {} - -impl PartialOrd for OperationId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for OperationId { - fn cmp(&self, other: &Self) -> Ordering { - self.get_id().cmp(&other.get_id()) - } -} - -impl SizeMeasurable for OperationId { - fn aligned_size(&self) -> usize { - Uuid::default().aligned_size() - } -} - -impl Default for OperationId { - fn default() -> Self { - OperationId::Single(Uuid::now_v7()) - } -} - -#[derive( - Archive, - Clone, - Copy, - Debug, - Default, - Deserialize, - Serialize, - Eq, - Ord, - PartialEq, - PartialOrd, - Hash, -)] -#[rkyv(compare(PartialEq), derive(Debug))] -#[repr(u8)] -pub enum OperationType { - #[default] - Insert, - Update, - Delete, -} - -impl SizeMeasurable for OperationType { - fn aligned_size(&self) -> usize { - u8::default().aligned_size() - } -} - -worktable! ( - name: BatchInner, - columns: { - id: u64 primary_key autoincrement, - operation_id: OperationId, - page_id: PageId, - link: Link, - op_type: OperationType, - pos: usize, - }, - indexes: { - operation_id_idx: operation_id, - page_id_idx: page_id, - link_idx: link, - op_type_idx: op_type, - }, - queries: { - update: { - PosByOpId(pos) by operation_id, - }, - } -); - -impl BatchInnerWorkTable { - pub fn iter_links(&self) -> impl Iterator { - self.0 - .indexes - .link_idx - .iter() - .map(|(l, _)| *l) - .collect::>() - .into_iter() - } -} - -impl From for BatchInnerRow { - fn from(value: QueueInnerRow) -> Self { - BatchInnerRow { - id: value.id, - operation_id: value.operation_id, - page_id: value.page_id, - link: value.link, - op_type: Default::default(), - pos: 0, - } - } -} - -#[derive(Debug)] -pub struct BatchOperation { - pub ops: Vec>, - pub info_wt: BatchInnerWorkTable, -} - -impl - BatchOperation -where - PrimaryKeyGenState: Debug + Clone, - PrimaryKey: Debug + Clone, - SecondaryKeys: Debug + Default + Clone + TableSecondaryIndexEventsOps, -{ - pub fn get_pk_gen_state(&self) -> eyre::Result> { - let row = self - .info_wt - .select_by_op_type(OperationType::Insert) - .order_on(BatchInnerRowFields::OperationId, Order::Desc) - .limit(1) - .execute()?; - Ok(row.into_iter().next().map(|r| { - let pos = r.pos; - let op = self.ops.get(pos).expect("available as pos in wt"); - op.pk_gen_state().expect("is insert operation").clone() - })) - } - - pub fn get_indexes_evs(&self) -> eyre::Result<(BatchChangeEvent, SecondaryKeys)> { - let mut primary = vec![]; - let mut secondary = SecondaryKeys::default(); - - let mut rows = self.info_wt.select_all().execute()?; - rows.sort_by(|l, r| l.operation_id.cmp(&r.operation_id)); - for row in rows { - let pos = row.pos; - let op = self - .ops - .get(pos) - .expect("pos should be correct as was set while batch build"); - if let Some(evs) = op.primary_key_events() { - primary.extend(evs.iter().cloned()) - } - let secondary_new = op.secondary_key_events(); - secondary.extend(secondary_new.clone()); - } - - Ok((primary, secondary)) - } - - pub fn get_batch_data_op(&self) -> eyre::Result { - let mut data = HashMap::new(); - for link in self.info_wt.iter_links() { - let last_op = self - .info_wt - .select_by_link(link) - .order_on(BatchInnerRowFields::OperationId, Order::Desc) - .limit(1) - .execute()?; - let op_row = last_op - .into_iter() - .next() - .expect("if link is in info_wt at least one row exists"); - let pos = op_row.pos; - let op = self - .ops - .get(pos) - .expect("pos should be correct as was set while batch build"); - if let Some(data_bytes) = op.bytes() { - let link = op.link(); - data.entry(link.page_id) - .and_modify(|v: &mut Vec<_>| v.push((link, data_bytes.to_vec()))) - .or_insert(vec![(link, data_bytes.to_vec())]); - } - } - - Ok(data) - } -} - -#[derive(Clone, Debug)] -pub enum Operation { - Insert(InsertOperation), - Update(UpdateOperation), - Delete(DeleteOperation), -} - -impl - Operation -{ - pub fn operation_type(&self) -> OperationType { - match &self { - Operation::Insert(_) => OperationType::Insert, - Operation::Update(_) => OperationType::Update, - Operation::Delete(_) => OperationType::Delete, - } - } - - pub fn operation_id(&self) -> OperationId { - match &self { - Operation::Insert(insert) => insert.id, - Operation::Update(update) => update.id, - Operation::Delete(delete) => delete.id, - } - } - - pub fn link(&self) -> Link { - match &self { - Operation::Insert(insert) => insert.link, - Operation::Update(update) => update.link, - Operation::Delete(delete) => delete.link, - } - } - - pub fn bytes(&self) -> Option<&[u8]> { - match &self { - Operation::Insert(insert) => Some(&insert.bytes), - Operation::Update(update) => Some(&update.bytes), - Operation::Delete(_) => None, - } - } - - pub fn primary_key_events(&self) -> Option<&Vec>>> { - match &self { - Operation::Insert(insert) => Some(&insert.primary_key_events), - Operation::Update(_) => None, - Operation::Delete(delete) => Some(&delete.primary_key_events), - } - } - - pub fn secondary_key_events(&self) -> &SecondaryKeys { - match &self { - Operation::Insert(insert) => &insert.secondary_keys_events, - Operation::Update(update) => &update.secondary_keys_events, - Operation::Delete(delete) => &delete.secondary_keys_events, - } - } - - pub fn pk_gen_state(&self) -> Option<&PrimaryKeyGenState> { - match &self { - Operation::Insert(insert) => Some(&insert.pk_gen_state), - Operation::Update(_) => None, - Operation::Delete(_) => None, - } - } -} - -#[derive(Clone, Debug)] -pub struct InsertOperation { - pub id: OperationId, - pub primary_key_events: Vec>>, - pub secondary_keys_events: SecondaryKeys, - pub pk_gen_state: PrimaryKeyGenState, - pub bytes: Vec, - pub link: Link, -} - -#[derive(Clone, Debug)] -pub struct UpdateOperation { - pub id: OperationId, - pub secondary_keys_events: SecondaryKeys, - pub bytes: Vec, - pub link: Link, -} - -#[derive(Clone, Debug)] -pub struct DeleteOperation { - pub id: OperationId, - pub primary_key_events: Vec>>, - pub secondary_keys_events: SecondaryKeys, - pub link: Link, -} diff --git a/src/persistence/operation/batch.rs b/src/persistence/operation/batch.rs new file mode 100644 index 00000000..cedf0584 --- /dev/null +++ b/src/persistence/operation/batch.rs @@ -0,0 +1,384 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; + +use data_bucket::page::PageId; +use data_bucket::{Link, SizeMeasurable}; +use indexset::cdc::change::ChangeEvent; +use indexset::core::pair::Pair; +use worktable_codegen::{worktable, MemStat}; + +use crate::persistence::space::{BatchChangeEvent, BatchData}; +use crate::persistence::task::{LastEventIds, QueueInnerRow}; +use crate::persistence::OperationType; +use crate::prelude::*; +use crate::prelude::{From, Order, SelectQueryExecutor}; + +worktable! ( + name: BatchInner, + columns: { + id: u64 primary_key autoincrement, + operation_id: OperationId, + page_id: PageId, + link: Link, + op_type: OperationType, + pos: usize, + }, + indexes: { + operation_id_idx: operation_id unique, + page_id_idx: page_id, + link_idx: link, + op_type_idx: op_type, + }, + queries: { + update: { + PosByOpId(pos) by operation_id, + }, + delete: { + ByOpId() by operation_id, + } + } +); + +impl BatchInnerWorkTable { + pub fn iter_links(&self) -> impl Iterator { + self.0 + .indexes + .link_idx + .iter() + .map(|(l, _)| *l) + .collect::>() + .into_iter() + } +} + +impl From for BatchInnerRow { + fn from(value: QueueInnerRow) -> Self { + BatchInnerRow { + id: value.id, + operation_id: value.operation_id, + page_id: value.page_id, + link: value.link, + op_type: Default::default(), + pos: 0, + } + } +} + +#[derive(Debug)] +pub struct BatchOperation { + ops: Vec>, + info_wt: BatchInnerWorkTable, + prepared_index_evs: Option>, + phantom_data: PhantomData, +} + +#[derive(Debug)] +pub struct PreparedIndexEvents { + primary_evs: Vec>>, + secondary_evs: SecondaryEvents, +} + +impl + BatchOperation +where + PrimaryKeyGenState: Debug + Clone, + PrimaryKey: Debug + Clone, + SecondaryEvents: Debug, +{ + pub fn new( + ops: Vec>, + info_wt: BatchInnerWorkTable, + ) -> Self { + Self { + ops, + info_wt, + prepared_index_evs: None, + phantom_data: PhantomData, + } + } +} + +impl + BatchOperation +where + PrimaryKeyGenState: Debug + Clone, + PrimaryKey: Debug + Clone, + SecondaryEvents: Debug + Default + Clone + TableSecondaryIndexEventsOps, + AvailableIndexes: Debug + Clone + Copy + Hash + Eq, +{ + pub fn ops(self) -> Vec> { + self.ops + } + + fn remove_operations_from_events( + &mut self, + invalid_events: PreparedIndexEvents, + ) -> HashSet> { + let mut removed_ops = HashSet::new(); + + for ev in &invalid_events.primary_evs { + if let Some(operation_pos_rev) = self.ops.iter().rev().position(|op| { + if let Some(evs) = op.primary_key_events() { + for inner_ev in evs { + if inner_ev.id() == ev.id() { + return true; + } + } + false + } else { + false + } + }) { + let op = self.ops.remove(self.ops.len() - (operation_pos_rev + 1)); + removed_ops.insert(op); + } + } + for (index, id) in invalid_events.secondary_evs.iter_event_ids() { + if let Some(operation_pos_rev) = self.ops.iter().rev().position(|op| { + let evs = op.secondary_key_events(); + evs.contains_event(index, id) + }) { + let op = self.ops.remove(self.ops.len() - (operation_pos_rev + 1)); + removed_ops.insert(op); + }; + // else it was already removed with primary + } + for op in &removed_ops { + let pk = self + .info_wt + .select_by_operation_id(op.operation_id()) + .expect("exists as all should be inserted on prepare step") + .id; + self.info_wt.delete_without_lock(pk.into()).unwrap(); + let prepared_evs = self + .prepared_index_evs + .as_mut() + .expect("should be set before 0 iteration"); + if let Some(primary_evs) = op.primary_key_events() { + for ev in primary_evs { + if let Ok(pos) = prepared_evs + .primary_evs + .binary_search_by(|inner_ev| inner_ev.id().cmp(&ev.id())) + { + prepared_evs.primary_evs.remove(pos); + } + } + } + let op_secondary = op.secondary_key_events(); + prepared_evs.secondary_evs.remove(op_secondary); + } + + removed_ops + } + + pub fn get_last_event_ids(&self) -> LastEventIds { + let prepared_evs = self + .prepared_index_evs + .as_ref() + .expect("should be set before 0 iteration"); + + let primary_id = prepared_evs + .primary_evs + .last() + .map(|ev| ev.id()) + .unwrap_or_default(); + let secondary_ids = prepared_evs.secondary_evs.last_evs(); + let secondary_ids = secondary_ids + .into_iter() + .map(|(i, v)| (i, v.unwrap_or_default())) + .collect(); + LastEventIds { + primary_id, + secondary_ids, + } + } + + pub async fn validate( + &mut self, + last_ids: &LastEventIds, + attempts: usize, + ) -> eyre::Result>>> { + let mut valid = false; + + self.prepared_index_evs = Some(self.prepare_indexes_evs()?); + let mut ops_to_remove = vec![]; + + while !valid { + let prepared_evs = self + .prepared_index_evs + .as_mut() + .expect("should be set before 0 iteration"); + let primary_invalid_events = validate_events(&mut prepared_evs.primary_evs); + let secondary_invalid_events = prepared_evs.secondary_evs.validate(); + + valid = if SecondaryEvents::is_unit() { + primary_invalid_events.is_empty() + } else { + primary_invalid_events.is_empty() && secondary_invalid_events.is_empty() + }; + + if valid { + break; + } + + let events_to_remove = PreparedIndexEvents { + primary_evs: primary_invalid_events, + secondary_evs: secondary_invalid_events, + }; + let ops = self.remove_operations_from_events(events_to_remove); + ops_to_remove.extend(ops); + } + + { + let prepared_evs = self + .prepared_index_evs + .as_ref() + .expect("should be set before 0 iteration"); + if let Some(id) = prepared_evs.primary_evs.first().map(|ev| ev.id()) { + if !id.is_next_for(last_ids.primary_id) + && last_ids.primary_id != IndexChangeEventId::default() + { + let mut possibly_valid = false; + if id.inner().overflowing_sub(last_ids.primary_id.inner()).0 == 2 { + // TODO: for split sometimes this happens + let ev = prepared_evs.primary_evs.first().unwrap(); + if let ChangeEvent::SplitNode { .. } = ev { + possibly_valid = true + } + if attempts > 8 { + possibly_valid = true + } + } + + if !possibly_valid { + self.ops.extend(ops_to_remove); + return Ok(None); + } + } + } + let secondary_first = prepared_evs.secondary_evs.first_evs(); + for (index, id) in secondary_first { + let Some(last) = last_ids.secondary_ids.get(&index) else { + continue; + }; + if let Some(id) = id { + if !id.is_next_for(*last) && *last != IndexChangeEventId::default() { + let mut possibly_valid = false; + if id.inner().overflowing_sub(last.inner()).0 == 2 { + // TODO: for split sometimes this happens + possibly_valid = prepared_evs.secondary_evs.is_first_ev_is_split(index); + if attempts > 8 { + possibly_valid = true + } + } + + if !possibly_valid { + self.ops.extend(ops_to_remove); + return Ok(None); + } + } + } + } + } + + { + let prepared_evs = self + .prepared_index_evs + .as_ref() + .expect("should be set before 0 iteration"); + if prepared_evs.primary_evs.is_empty() && prepared_evs.secondary_evs.is_empty() { + self.ops = ops_to_remove; + return Ok(None); + } + } + + for (pos, op) in self.ops.iter().enumerate() { + let op_id = op.operation_id(); + let q = PosByOpIdQuery { pos }; + self.info_wt.update_pos_by_op_id(q, op_id).await? + } + + Ok(Some(ops_to_remove)) + } + + fn prepare_indexes_evs( + &self, + ) -> eyre::Result> { + let mut primary_evs = vec![]; + let mut secondary_evs = SecondaryEvents::default(); + + for op in &self.ops { + if let Some(evs) = op.primary_key_events() { + primary_evs.extend(evs.iter().cloned()) + } + let secondary_new = op.secondary_key_events(); + secondary_evs.extend(secondary_new.clone()); + } + + // is used to make all events id's monotonically grow + primary_evs.sort_by_key(|ev1| ev1.id()); + secondary_evs.sort(); + + Ok(PreparedIndexEvents { + primary_evs, + secondary_evs, + }) + } + + pub fn get_pk_gen_state(&self) -> eyre::Result> { + let row = self + .info_wt + .select_by_op_type(OperationType::Insert) + .order_on(BatchInnerRowFields::OperationId, Order::Desc) + .limit(1) + .execute()?; + Ok(row.into_iter().next().map(|r| { + let pos = r.pos; + let op = self.ops.get(pos).expect("available as pos in wt"); + op.pk_gen_state().expect("is insert operation").clone() + })) + } + + pub fn get_indexes_evs(&self) -> eyre::Result<(BatchChangeEvent, SecondaryEvents)> { + if let Some(evs) = &self.prepared_index_evs { + Ok((evs.primary_evs.clone(), evs.secondary_evs.clone())) + } else { + tracing::warn!( + "Index events are not validated and it can cause errors while applying batch" + ); + let evs = self.prepare_indexes_evs()?; + Ok((evs.primary_evs.clone(), evs.secondary_evs.clone())) + } + } + + pub fn get_batch_data_op(&self) -> eyre::Result { + let mut data = HashMap::new(); + for link in self.info_wt.iter_links() { + let last_op = self + .info_wt + .select_by_link(link) + .order_on(BatchInnerRowFields::OperationId, Order::Desc) + .limit(1) + .execute()?; + let op_row = last_op + .into_iter() + .next() + .expect("if link is in info_wt at least one row exists"); + let pos = op_row.pos; + let op = self + .ops + .get(pos) + .expect("pos should be correct as was set while batch build"); + if let Some(data_bytes) = op.bytes() { + let link = op.link(); + data.entry(link.page_id) + .and_modify(|v: &mut Vec<_>| v.push((link, data_bytes.to_vec()))) + .or_insert(vec![(link, data_bytes.to_vec())]); + } + } + + Ok(data) + } +} diff --git a/src/persistence/operation/mod.rs b/src/persistence/operation/mod.rs new file mode 100644 index 00000000..cd4e5499 --- /dev/null +++ b/src/persistence/operation/mod.rs @@ -0,0 +1,104 @@ +mod batch; +#[allow(clippy::module_inception)] +mod operation; +mod util; + +use std::cmp::Ordering; +use std::fmt::Debug; +use std::hash::{Hash, Hasher}; + +use data_bucket::SizeMeasurable; +use derive_more::Display; +use rkyv::{Archive, Deserialize, Serialize}; +use uuid::Uuid; + +use crate::prelude::From; + +pub use batch::{BatchInnerRow, BatchInnerWorkTable, BatchOperation, PosByOpIdQuery}; +pub use operation::{DeleteOperation, InsertOperation, Operation, UpdateOperation}; +pub use util::validate_events; + +/// Represents page's identifier. Is unique within the table bounds +#[derive(Archive, Copy, Clone, Deserialize, Debug, Display, From, Serialize)] +#[rkyv(derive(Debug, PartialOrd, PartialEq, Eq, Ord))] +pub enum OperationId { + #[from] + Single(Uuid), + Multi(Uuid), +} + +impl OperationId { + fn get_id(&self) -> Uuid { + match self { + OperationId::Single(id) => *id, + OperationId::Multi(id) => *id, + } + } +} + +impl Hash for OperationId { + fn hash(&self, state: &mut H) { + Hash::hash(&self.get_id(), state) + } +} + +impl PartialEq for OperationId { + fn eq(&self, other: &Self) -> bool { + self.get_id().eq(&other.get_id()) + } +} + +impl Eq for OperationId {} + +impl PartialOrd for OperationId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OperationId { + fn cmp(&self, other: &Self) -> Ordering { + self.get_id().cmp(&other.get_id()) + } +} + +impl SizeMeasurable for OperationId { + fn aligned_size(&self) -> usize { + Uuid::default().aligned_size() + } +} + +impl Default for OperationId { + fn default() -> Self { + OperationId::Single(Uuid::now_v7()) + } +} + +#[derive( + Archive, + Clone, + Copy, + Debug, + Default, + Deserialize, + Serialize, + Eq, + Ord, + PartialEq, + PartialOrd, + Hash, +)] +#[rkyv(compare(PartialEq), derive(Debug))] +#[repr(u8)] +pub enum OperationType { + #[default] + Insert, + Update, + Delete, +} + +impl SizeMeasurable for OperationType { + fn aligned_size(&self) -> usize { + u8::default().aligned_size() + } +} diff --git a/src/persistence/operation/operation.rs b/src/persistence/operation/operation.rs new file mode 100644 index 00000000..dd6b92a4 --- /dev/null +++ b/src/persistence/operation/operation.rs @@ -0,0 +1,122 @@ +use std::fmt::Debug; +use std::hash::{Hash, Hasher}; + +use data_bucket::Link; +use indexset::cdc::change::ChangeEvent; +use indexset::core::pair::Pair; + +use crate::persistence::{OperationId, OperationType}; + +#[derive(Clone, Debug)] +pub enum Operation { + Insert(InsertOperation), + Update(UpdateOperation), + Delete(DeleteOperation), +} + +impl Hash + for Operation +{ + fn hash(&self, state: &mut H) { + Hash::hash(&self.operation_id(), state) + } +} + +impl PartialEq + for Operation +{ + fn eq(&self, other: &Self) -> bool { + self.operation_id().eq(&other.operation_id()) + } +} + +impl Eq + for Operation +{ +} + +impl + Operation +{ + pub fn operation_type(&self) -> OperationType { + match &self { + Operation::Insert(_) => OperationType::Insert, + Operation::Update(_) => OperationType::Update, + Operation::Delete(_) => OperationType::Delete, + } + } + + pub fn operation_id(&self) -> OperationId { + match &self { + Operation::Insert(insert) => insert.id, + Operation::Update(update) => update.id, + Operation::Delete(delete) => delete.id, + } + } + + pub fn link(&self) -> Link { + match &self { + Operation::Insert(insert) => insert.link, + Operation::Update(update) => update.link, + Operation::Delete(delete) => delete.link, + } + } + + pub fn bytes(&self) -> Option<&[u8]> { + match &self { + Operation::Insert(insert) => Some(&insert.bytes), + Operation::Update(update) => Some(&update.bytes), + Operation::Delete(_) => None, + } + } + + pub fn primary_key_events(&self) -> Option<&Vec>>> { + match &self { + Operation::Insert(insert) => Some(&insert.primary_key_events), + Operation::Update(_) => None, + Operation::Delete(delete) => Some(&delete.primary_key_events), + } + } + + pub fn secondary_key_events(&self) -> &SecondaryKeys { + match &self { + Operation::Insert(insert) => &insert.secondary_keys_events, + Operation::Update(update) => &update.secondary_keys_events, + Operation::Delete(delete) => &delete.secondary_keys_events, + } + } + + pub fn pk_gen_state(&self) -> Option<&PrimaryKeyGenState> { + match &self { + Operation::Insert(insert) => Some(&insert.pk_gen_state), + Operation::Update(_) => None, + Operation::Delete(_) => None, + } + } +} + +#[derive(Clone, Debug)] +pub struct InsertOperation { + pub id: OperationId, + pub primary_key_events: Vec>>, + pub secondary_keys_events: SecondaryKeys, + pub pk_gen_state: PrimaryKeyGenState, + pub bytes: Vec, + pub link: Link, +} + +#[derive(Clone, Debug)] +pub struct UpdateOperation { + pub id: OperationId, + pub secondary_keys_events: SecondaryKeys, + pub bytes: Vec, + pub link: Link, +} + +#[derive(Clone, Debug)] +pub struct DeleteOperation { + pub id: OperationId, + pub primary_key_events: Vec>>, + pub secondary_keys_events: SecondaryKeys, + pub link: Link, +} diff --git a/src/persistence/operation/util.rs b/src/persistence/operation/util.rs new file mode 100644 index 00000000..46356327 --- /dev/null +++ b/src/persistence/operation/util.rs @@ -0,0 +1,60 @@ +use data_bucket::Link; +use indexset::cdc::change::{self, ChangeEvent}; +use indexset::core::pair::Pair; +use std::fmt::Debug; + +pub const MAX_CHECK_DEPTH: usize = 30; + +pub fn validate_events( + evs: &mut Vec>>, +) -> Vec>> +where + T: Debug, +{ + let mut removed_events = vec![]; + let mut finish_condition = false; + + while !finish_condition { + let (iteration_events, error_pos) = validate_events_iteration(evs); + if iteration_events.is_empty() { + finish_condition = true; + } else { + let drain_pos = evs.len() - error_pos; + removed_events.extend(evs.drain(drain_pos..)); + } + } + + removed_events.sort_by_key(|ev2| std::cmp::Reverse(ev2.id())); + + removed_events +} + +fn validate_events_iteration(evs: &[ChangeEvent>]) -> (Vec, usize) { + let Some(mut last_ev_id) = evs.last().map(|ev| ev.id()) else { + return (vec![], 0); + }; + let mut evs_before_error = vec![last_ev_id]; + let mut rev_evs_iter = evs.iter().rev().skip(1); + let mut error_flag = false; + let mut check_depth = 1; + + while !error_flag && check_depth < MAX_CHECK_DEPTH { + if let Some(next_ev) = rev_evs_iter.next().map(|ev| ev.id()) { + if last_ev_id.is_next_for(next_ev) || last_ev_id == next_ev { + check_depth += 1; + last_ev_id = next_ev; + evs_before_error.push(last_ev_id); + } else { + error_flag = true + } + } else { + break; + } + } + + if error_flag { + (evs_before_error, check_depth) + } else { + (vec![], 0) + } +} diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index 011eee5a..b664e8d0 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -396,22 +396,27 @@ where ) -> eyre::Result<()> { match event { ChangeEvent::InsertAt { + event_id: _, max_value: node_id, value, index, } => self.process_insert_at(node_id, value, index).await, ChangeEvent::RemoveAt { + event_id: _, max_value: node_id, value, index, } => self.process_remove_at(node_id, value, index).await, - ChangeEvent::CreateNode { max_value: node_id } => { - self.process_create_node(node_id).await - } - ChangeEvent::RemoveNode { max_value: node_id } => { - self.process_remove_node(node_id).await - } + ChangeEvent::CreateNode { + event_id: _, + max_value: node_id, + } => self.process_create_node(node_id).await, + ChangeEvent::RemoveNode { + event_id: _, + max_value: node_id, + } => self.process_remove_node(node_id).await, ChangeEvent::SplitNode { + event_id: _, max_value: node_id, split_index, } => self.process_split_node(node_id, split_index).await, @@ -423,17 +428,14 @@ where events: BatchChangeEvent, ) -> eyre::Result<()> { let mut pages: HashMap = HashMap::new(); - for ev in events { + for ev in &events { match &ev { ChangeEvent::InsertAt { max_value, .. } | ChangeEvent::RemoveAt { max_value, .. } => { let page_id = &(max_value.key.clone(), max_value.value); - // println!("{:?}", page_id); - // println!("{:?}", self.table_of_contents.iter().collect::>()); - let page_index = self - .table_of_contents - .get(page_id) - .expect("page should be available in table of contents"); + let Some(page_index) = self.table_of_contents.get(page_id) else { + panic!("page should be available in table of contents") + }; let page = pages.get_mut(&page_index); let page_to_update = if let Some(page) = page { page @@ -464,7 +466,10 @@ where ); } } - ChangeEvent::CreateNode { max_value } => { + ChangeEvent::CreateNode { + event_id: _, + max_value, + } => { let page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { id } else { @@ -476,6 +481,7 @@ where let size = get_index_page_size_from_data_length::(INNER_PAGE_SIZE as usize); let mut page = IndexPage::new(max_value.clone().into(), size); let ev = ChangeEvent::InsertAt { + event_id: 0.into(), max_value: max_value.clone(), value: max_value.clone(), index: 0, @@ -490,19 +496,22 @@ where self.table_of_contents .insert((max_value.key.clone(), max_value.value), page_id) } - ChangeEvent::RemoveNode { max_value } => { + ChangeEvent::RemoveNode { + event_id: _, + max_value, + } => { self.table_of_contents .remove(&(max_value.key.clone(), max_value.value)); } ChangeEvent::SplitNode { + event_id: _, max_value, split_index, } => { let page_id = &(max_value.key.clone(), max_value.value); - let page_index = self - .table_of_contents - .get(page_id) - .expect("page should be available in table of contents"); + let Some(page_index) = self.table_of_contents.get(page_id) else { + panic!("page should be available in table of contents") + }; let page = pages.get_mut(&page_index); let page_to_update = if let Some(page) = page { page diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index 4cf8e7d3..2f7d5513 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -356,22 +356,27 @@ where ) -> eyre::Result<()> { match event { ChangeEvent::InsertAt { + event_id: _, max_value: node_id, value, index, } => self.process_insert_at(node_id, value, index).await, ChangeEvent::RemoveAt { + event_id: _, max_value: node_id, value, index, } => self.process_remove_at(node_id, value, index).await, - ChangeEvent::CreateNode { max_value: node_id } => { - self.process_create_node(node_id).await - } - ChangeEvent::RemoveNode { max_value: node_id } => { - self.process_remove_node(node_id).await - } + ChangeEvent::CreateNode { + event_id: _, + max_value: node_id, + } => self.process_create_node(node_id).await, + ChangeEvent::RemoveNode { + event_id: _, + max_value: node_id, + } => self.process_remove_node(node_id).await, ChangeEvent::SplitNode { + event_id: _, max_value: node_id, split_index, } => self.process_split_node(node_id, split_index).await, @@ -424,7 +429,10 @@ where ); } } - ChangeEvent::CreateNode { max_value } => { + ChangeEvent::CreateNode { + event_id: _, + max_value, + } => { let page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { id } else { @@ -444,11 +452,15 @@ where self.table_of_contents .insert((max_value.key.clone(), max_value.value), page_id) } - ChangeEvent::RemoveNode { max_value } => { + ChangeEvent::RemoveNode { + event_id: _, + max_value, + } => { self.table_of_contents .remove(&(max_value.key.clone(), max_value.value)); } ChangeEvent::SplitNode { + event_id: _, max_value, split_index, } => { diff --git a/src/persistence/task.rs b/src/persistence/task.rs index 46304f95..d7bead44 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -1,9 +1,12 @@ -use data_bucket::page::PageId; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; + +use data_bucket::page::PageId; use tokio::sync::Notify; use worktable_codegen::worktable; @@ -32,22 +35,65 @@ worktable! ( const MAX_PAGE_AMOUNT: usize = 16; -pub struct QueueAnalyzer { +pub struct QueueAnalyzer { operations: OptimizedVec>, queue_inner_wt: Arc, + last_events_ids: LastEventIds, + last_invalid_batch_size: usize, + page_limit: usize, + attempts: usize, } -impl - QueueAnalyzer +#[derive(Debug)] +pub struct LastEventIds { + pub primary_id: IndexChangeEventId, + pub secondary_ids: HashMap, +} + +impl Default for LastEventIds +where + AvailableIndexes: Eq + Hash, +{ + fn default() -> Self { + Self { + primary_id: Default::default(), + secondary_ids: HashMap::new(), + } + } +} + +impl LastEventIds +where + AvailableIndexes: Debug + Hash + Eq, +{ + pub fn merge(&mut self, another: Self) { + if another.primary_id != IndexChangeEventId::default() { + self.primary_id = another.primary_id + } + for (index, id) in another.secondary_ids { + if id != IndexChangeEventId::default() { + self.secondary_ids.insert(index, id); + } + } + } +} + +impl + QueueAnalyzer where PrimaryKeyGenState: Debug, PrimaryKey: Debug, SecondaryKeys: Debug, + AvailableIndexes: Debug + Copy + Clone + Hash + Eq, { pub fn new(queue_inner_wt: Arc) -> Self { Self { operations: OptimizedVec::with_capacity(256), queue_inner_wt, + last_events_ids: Default::default(), + last_invalid_batch_size: 0, + page_limit: MAX_PAGE_AMOUNT, + attempts: 0, } } @@ -92,18 +138,20 @@ where pub async fn collect_batch_from_op_id( &mut self, op_id: OperationId, - ) -> eyre::Result> + ) -> eyre::Result< + Option>, + > where PrimaryKeyGenState: Clone, PrimaryKey: Clone, - SecondaryKeys: Clone, + SecondaryKeys: Clone + Default + TableSecondaryIndexEventsOps, { let mut ops_set = HashSet::new(); let mut used_page_ids = HashSet::new(); let mut next_op_id = op_id; let mut no_more_ops = false; - while used_page_ids.len() < MAX_PAGE_AMOUNT && !no_more_ops { + while used_page_ids.len() < self.page_limit && !no_more_ops { let ops_rows = self .queue_inner_wt .select_by_operation_id(next_op_id) @@ -216,7 +264,29 @@ where info_wt.update_pos_by_op_id(q, op_id).await?; } - Ok(BatchOperation { ops, info_wt }) + let mut op = BatchOperation::new(ops, info_wt); + let invalid_for_this_batch_ops = op.validate(&self.last_events_ids, self.attempts).await?; + if let Some(invalid_for_this_batch_ops) = invalid_for_this_batch_ops { + self.extend_from_iter(invalid_for_this_batch_ops.into_iter())?; + let last_ids = op.get_last_event_ids(); + self.last_events_ids.merge(last_ids); + self.last_invalid_batch_size = 0; + self.page_limit = MAX_PAGE_AMOUNT; + self.attempts = 0; + + Ok(Some(op)) + } else { + // can't collect batch for now + let ops = op.ops(); + self.attempts += 1; + if self.last_invalid_batch_size == ops.len() { + self.page_limit += 8; + } else { + self.last_invalid_batch_size = ops.len(); + } + self.extend_from_iter(ops.into_iter())?; + Ok(None) + } } pub fn len(&self) -> usize { @@ -287,17 +357,18 @@ impl } #[derive(Debug)] -pub struct PersistenceTask { +pub struct PersistenceTask { #[allow(dead_code)] engine_task_handle: tokio::task::AbortHandle, queue: Arc>, analyzer_inner_wt: Arc, analyzer_in_progress: Arc, progress_notify: Arc, + phantom_data: PhantomData, } -impl - PersistenceTask +impl + PersistenceTask { pub fn apply_operation(&self, op: Operation) { self.queue.push(op); @@ -305,10 +376,19 @@ impl pub fn run_engine(mut engine: E) -> Self where - E: PersistenceEngineOps + Send + 'static, - SecondaryKeys: Clone + Debug + Send + Sync + 'static, + E: PersistenceEngineOps + + Send + + 'static, + SecondaryKeys: Clone + + Debug + + Default + + TableSecondaryIndexEventsOps + + Send + + Sync + + 'static, PrimaryKeyGenState: Clone + Debug + Send + Sync + 'static, PrimaryKey: Clone + Debug + Send + Sync + 'static, + AvailableIndexes: Copy + Clone + Debug + Hash + Eq + Send + Sync + 'static, { let queue = Arc::new(Queue::new()); let progress_notify = Arc::new(Notify::new()); @@ -346,8 +426,7 @@ impl let batch_op = analyzer.collect_batch_from_op_id(op_id).await; if let Err(e) = batch_op { tracing::warn!("Error collecting batch operation: {}", e); - } else { - let batch_op = batch_op.unwrap(); + } else if let Some(batch_op) = batch_op.unwrap() { let res = engine.apply_batch_operation(batch_op).await; if let Err(e) = res { tracing::warn!( @@ -355,6 +434,8 @@ impl e ); } + } else { + tokio::time::sleep(Duration::from_millis(500)).await; } } } @@ -366,6 +447,7 @@ impl analyzer_inner_wt, analyzer_in_progress, progress_notify, + phantom_data: PhantomData, } } diff --git a/src/table/mod.rs b/src/table/mod.rs index 3a614d54..bcbe82d7 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -126,7 +126,7 @@ impl< > where Row: TableRow, - PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, + PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, PkNodeType: NodeLike> + Send + 'static, Row: StorableRow, ::WrappedRow: RowWrapper, diff --git a/src/table/system_info.rs b/src/table/system_info.rs index 90acfd91..ab43b9b7 100644 --- a/src/table/system_info.rs +++ b/src/table/system_info.rs @@ -2,11 +2,11 @@ use data_bucket::Link; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use prettytable::{format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR, row, Table}; -use std::fmt::{self, Display, Formatter}; +use std::fmt::{self, Debug, Display, Formatter}; use crate::in_memory::{RowWrapper, StorableRow}; use crate::mem_stat::MemStat; -use crate::{TableSecondaryIndex, WorkTable}; +use crate::{TableSecondaryIndexInfo, WorkTable}; #[derive(Debug)] pub struct SystemInfo { @@ -50,7 +50,7 @@ impl< PrimaryKey, AvailableTypes, AvailableIndexes, - SecondaryIndexes: MemStat + TableSecondaryIndex, + SecondaryIndexes, LockType, PkGen, NodeType, @@ -68,10 +68,11 @@ impl< DATA_LENGTH, > where - PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, + PrimaryKey: Debug + Clone + Ord + Send + 'static + std::hash::Hash, Row: StorableRow, ::WrappedRow: RowWrapper, NodeType: NodeLike> + Send + 'static, + SecondaryIndexes: MemStat + TableSecondaryIndexInfo, { pub fn system_info(&self) -> SystemInfo { let page_count = self.data.get_page_count(); diff --git a/tests/persistence/concurrent/mod.rs b/tests/persistence/concurrent/mod.rs new file mode 100644 index 00000000..73ce7101 --- /dev/null +++ b/tests/persistence/concurrent/mod.rs @@ -0,0 +1,120 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::task; +use worktable::prelude::*; +use worktable::worktable; + +use crate::remove_dir_if_exists; + +worktable! ( + name: TestConcurrent, + persist: true, + columns: { + id: u64 primary_key autoincrement, + another: u64, + value: u64, + + field_0: f64, + field_1: f64, + field_2: f64, + field_3: f64, + field_4: f64, + field_5: f64, + field_6: f64, + field_7: f64, + field_8: f64, + field_9: f64, + field_10: f64, + field_11: f64, + field_12: f64, + field_13: f64, + field_14: f64, + field_15: f64, + field_16: f64, + field_17: f64, + }, + indexes: { + another_idx: another, + value_idx: value unique, + }, + queries: { + update: { + AnotherById(another) by id, + }, + delete: { + ByAnother() by another, + } + }, + config: { + row_derives: Default, + } +); + +#[test] +fn test_concurrent() { + let config = PersistenceConfig::new("tests/data/concurrent/test", "tests/data/concurrent/test"); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/concurrent/test".to_string()).await; + { + let table = Arc::new( + TestConcurrentWorkTable::load_from_file(config.clone()) + .await + .unwrap(), + ); + + let total: u64 = 5_000; + let tasks = 8; + let chunk = total / tasks; + + let mut handles = Vec::with_capacity(tasks as usize); + for t in 0..tasks { + let start_id = t * chunk; + let end_id = if t == tasks - 1 { + total + } else { + (t + 1) * chunk + }; + let task_table = table.clone(); + + handles.push(task::spawn(async move { + for value in start_id..end_id { + task_table + .insert(TestConcurrentRow { + id: task_table.get_next_pk().into(), + another: value % 1000, + value, + ..Default::default() + }) + .unwrap(); + + tokio::time::sleep(Duration::from_millis(50)).await; + } + })) + } + + // Await all tasks + for h in handles { + let _ = h.await; + } + + table.wait_for_ops().await; + } + { + let table = Arc::new( + TestConcurrentWorkTable::load_from_file(config.clone()) + .await + .unwrap(), + ); + assert_eq!(table.count(), 5_000) + } + }) +} diff --git a/tests/persistence/mod.rs b/tests/persistence/mod.rs index bae28abd..dc2ebc4d 100644 --- a/tests/persistence/mod.rs +++ b/tests/persistence/mod.rs @@ -1,6 +1,7 @@ use worktable::prelude::*; use worktable::worktable; +mod concurrent; mod index_page; mod read; mod space_index; diff --git a/tests/persistence/space_index/unsized_write.rs b/tests/persistence/space_index/unsized_write.rs index adb38ca1..9158f3dd 100644 --- a/tests/persistence/space_index/unsized_write.rs +++ b/tests/persistence/space_index/unsized_write.rs @@ -26,6 +26,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -65,6 +66,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: "Someone from somewhere".to_string(), value: Link { @@ -104,6 +106,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::RemoveNode { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -143,6 +146,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -191,6 +195,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -215,6 +220,7 @@ mod run_first { for i in (1..100).rev() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone _100".to_string(), value: Link { @@ -264,6 +270,7 @@ async fn test_space_index_process_remove_at() { space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -312,6 +319,7 @@ async fn test_space_index_process_remove_at_node_id() { space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -360,6 +368,7 @@ async fn test_space_index_process_insert_at_with_node_id_update() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -409,6 +418,7 @@ async fn test_space_index_process_insert_at_removed_place() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone".to_string(), value: Link { @@ -431,6 +441,7 @@ async fn test_space_index_process_insert_at_removed_place() { .unwrap(); space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone 1".to_string(), value: Link { @@ -453,6 +464,7 @@ async fn test_space_index_process_insert_at_removed_place() { .unwrap(); space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: "Something from someone 1".to_string(), value: Link { @@ -502,6 +514,7 @@ async fn test_space_index_process_create_node_after_remove() { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: "Something else".to_string(), value: Link { @@ -540,6 +553,7 @@ async fn test_space_index_process_split_node() { space_index .process_change_event(ChangeEvent::SplitNode { + event_id: 0.into(), max_value: Pair { key: "Something from someone _100".to_string(), value: Link { diff --git a/tests/persistence/space_index/write.rs b/tests/persistence/space_index/write.rs index fcc03cd7..9a30b6cf 100644 --- a/tests/persistence/space_index/write.rs +++ b/tests/persistence/space_index/write.rs @@ -24,6 +24,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -63,6 +64,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: 15, value: Link { @@ -99,6 +101,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -147,6 +150,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -171,6 +175,7 @@ mod run_first { for i in (6..909).rev() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 1000, value: Link { @@ -218,6 +223,7 @@ mod run_first { space_index .process_change_event(ChangeEvent::RemoveNode { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -258,6 +264,7 @@ async fn test_space_index_process_insert_at_with_node_id_update() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -303,6 +310,7 @@ async fn test_space_index_process_remove_at() { space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -349,6 +357,7 @@ async fn test_space_index_process_remove_at_node_id() { space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -397,6 +406,7 @@ async fn test_space_index_process_insert_at_removed_place() { space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 5, value: Link { @@ -419,6 +429,7 @@ async fn test_space_index_process_insert_at_removed_place() { .unwrap(); space_index .process_change_event(ChangeEvent::RemoveAt { + event_id: 0.into(), max_value: Pair { key: 7, value: Link { @@ -441,6 +452,7 @@ async fn test_space_index_process_insert_at_removed_place() { .unwrap(); space_index .process_change_event(ChangeEvent::InsertAt { + event_id: 0.into(), max_value: Pair { key: 7, value: Link { @@ -489,6 +501,7 @@ async fn test_space_index_process_create_node_after_remove() { space_index .process_change_event(ChangeEvent::CreateNode { + event_id: 0.into(), max_value: Pair { key: 10, value: Link { @@ -525,6 +538,7 @@ async fn test_space_index_process_split_node() { space_index .process_change_event(ChangeEvent::SplitNode { + event_id: 0.into(), max_value: Pair { key: 1000, value: Link {