Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.5.6"
version = "0.6.1"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -22,12 +22,12 @@ tokio = { version = "1", features = ["full"] }
tracing = "0.1"
rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.5.5" }
worktable_codegen = { path = "codegen", version = "0.6.0" }
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4"] }
#data_bucket = "0.2.3"
data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
# data_bucket = { path = "../DataBucket", version = "0.2.2" }
uuid = { version = "1.10.0", features = ["v4", "v7"] }
data_bucket = "0.2.4"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
# data_bucket = { path = "../DataBucket", version = "0.2.3" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
indexset = { version = "0.12.2", features = ["concurrent", "cdc", "multimap"] }
Expand Down
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.5.5"
version = "0.6.0"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/persist_index/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ impl Generator {
if is_unsized(&t.to_string()) {
let const_size = name_generator.get_page_inner_size_const_ident();
quote! {
#i: (Vec<GeneralPage<TableOfContentsPage<#t>>>, Vec<GeneralPage<UnsizedIndexPage<#t, {#const_size as u32}>>>),
#i: (Vec<GeneralPage<TableOfContentsPage<(#t, Link)>>>, Vec<GeneralPage<UnsizedIndexPage<#t, {#const_size as u32}>>>),
}
} else {
quote! {
#i: (Vec<GeneralPage<TableOfContentsPage<#t>>>, Vec<GeneralPage<IndexPage<#t>>>),
#i: (Vec<GeneralPage<TableOfContentsPage<(#t, Link)>>>, Vec<GeneralPage<IndexPage<#t>>>),
}
}
})
Expand Down
52 changes: 51 additions & 1 deletion codegen/src/persist_index/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ impl Generator {
let secondary_index = self.gen_space_secondary_index_type();
let secondary_impl = self.gen_space_secondary_index_impl_space_index();
let secondary_index_events = self.gen_space_secondary_index_events_type();
let secondary_index_events_impl = self.gen_space_secondary_index_events_impl();

quote! {
#secondary_index_events
#secondary_index_events_impl
#secondary_index
#secondary_impl
}
Expand All @@ -34,13 +36,36 @@ impl Generator {
.collect();

quote! {
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct #ident {
#(#fields)*
}
}
}

fn gen_space_secondary_index_events_impl(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident);
let ident = name_generator.get_space_secondary_index_events_ident();

let fields: Vec<_> = self
.field_types
.keys()
.map(|i| {
quote! {
self.#i.extend(another.#i);
}
})
.collect();

quote! {
impl TableSecondaryIndexEventsOps for #ident {
fn extend(&mut self, another: #ident) {
#(#fields)*
}
}
}
}

fn gen_space_secondary_index_type(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident);
let ident = name_generator.get_space_secondary_index_ident();
Expand Down Expand Up @@ -78,11 +103,14 @@ impl Generator {
let from_table_files_path_fn = self.gen_space_secondary_index_from_table_files_path_fn();
let index_process_change_events_fn =
self.gen_space_secondary_index_process_change_events_fn();
let index_process_change_event_batch_fn =
self.gen_space_secondary_index_process_change_event_batch_fn();

quote! {
impl SpaceSecondaryIndexOps<#events_ident> for #ident {
#from_table_files_path_fn
#index_process_change_events_fn
#index_process_change_event_batch_fn
}
}
}
Expand Down Expand Up @@ -138,4 +166,26 @@ impl Generator {
}
}
}

fn gen_space_secondary_index_process_change_event_batch_fn(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_index_ident(&self.struct_def.ident);
let events_ident = name_generator.get_space_secondary_index_events_ident();

let process: Vec<_> = self
.field_types
.keys()
.map(|i| {
quote! {
self.#i.process_change_event_batch(events.#i).await?;
}
})
.collect();

quote! {
async fn process_change_event_batch(&mut self, events: #events_ident) -> eyre::Result<()> {
#(#process)*
core::result::Result::Ok(())
}
}
}
}
4 changes: 3 additions & 1 deletion codegen/src/persist_table/generator/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl Generator {
let ident = name_generator.get_persistence_engine_ident();
let primary_key_type = name_generator.get_primary_key_type_ident();
let inner_const_name = name_generator.get_page_inner_size_const_ident();
let const_name = name_generator.get_page_size_const_ident();
let space_secondary_indexes = name_generator.get_space_secondary_index_ident();
let space_secondary_indexes_events =
name_generator.get_space_secondary_index_events_ident();
Expand All @@ -42,7 +43,8 @@ impl Generator {
pub type #ident = PersistenceEngine<
SpaceData<
<<#primary_key_type as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State,
{ #inner_const_name as u32 }
{ #inner_const_name},
{ #const_name as u32 }
>,
#space_index_type
#space_secondary_indexes,
Expand Down
6 changes: 3 additions & 3 deletions codegen/src/persist_table/generator/space_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ impl Generator {
let space_file_ident = name_generator.get_space_file_ident();
let primary_index = if self.attributes.pk_unsized {
quote! {
pub primary_index: (Vec<GeneralPage<TableOfContentsPage<#pk_type>>>, Vec<GeneralPage<UnsizedIndexPage<#pk_type, {#inner_const_name as u32}>>>),
pub primary_index: (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<UnsizedIndexPage<#pk_type, {#inner_const_name as u32}>>>),
}
} else {
quote! {
pub primary_index: (Vec<GeneralPage<TableOfContentsPage<#pk_type>>>, Vec<GeneralPage<IndexPage<#pk_type>>>),
pub primary_index: (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<IndexPage<#pk_type>>>),
}
};

Expand Down Expand Up @@ -260,7 +260,7 @@ impl Generator {
let file_length = data_file.metadata().await?.len();
let count = file_length / (#inner_const_name as u64 + GENERAL_HEADER_SIZE as u64);
for page_id in 1..=count {
let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32).await?;
let index = parse_data_page::<{ #page_const_name as u32}, { #inner_const_name as usize }>(&mut data_file, page_id as u32).await?;
data.push(index);
}
(data, info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Generator {
let const_name = name_generator.get_page_inner_size_const_ident();
if self.attributes.pk_unsized {
quote! {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<#pk_type>>>, Vec<GeneralPage<UnsizedIndexPage<#pk_type, {#const_name as u32}>>>) {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<UnsizedIndexPage<#pk_type, {#const_name as u32}>>>) {
let mut pages = vec![];
for node in self.0.pk_map.iter_nodes() {
let page = UnsizedIndexPage::from_node(node.lock_arc().as_ref());
Expand All @@ -115,7 +115,7 @@ impl Generator {
}
} else {
quote! {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<#pk_type>>>, Vec<GeneralPage<IndexPage<#pk_type>>>) {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<IndexPage<#pk_type>>>) {
let size = get_index_page_size_from_data_length::<#pk_type>(#const_name);
let mut pages = vec![];
for node in self.0.pk_map.iter_nodes() {
Expand Down
6 changes: 2 additions & 4 deletions codegen/src/worktable/generator/index/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Generator {
let index_field_name = &idx.name;
let diff_key = Literal::string(i.to_string().as_str());

let match_arm = if let Some(t) = self.columns.columns_map.get(&idx.field) {
if let Some(t) = self.columns.columns_map.get(&idx.field) {
let type_str = t.to_string();
let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site());

Expand Down Expand Up @@ -156,9 +156,7 @@ impl Generator {
}
} else {
quote! {}
};

match_arm
}
});
let idents = self
.columns
Expand Down
6 changes: 2 additions & 4 deletions codegen/src/worktable/generator/index/usual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Generator {
let index_field_name = &idx.name;
let diff_key = Literal::string(i.to_string().as_str());

let match_arm = if let Some(t) = self.columns.columns_map.get(&idx.field) {
if let Some(t) = self.columns.columns_map.get(&idx.field) {
let type_str = t.to_string();
let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site());

Expand All @@ -174,9 +174,7 @@ impl Generator {
}
} else {
quote! {}
};

match_arm
}
});

quote! {
Expand Down
5 changes: 3 additions & 2 deletions codegen/src/worktable/generator/queries/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Generator {
let delete_logic = self.gen_delete_logic();

quote! {
pub async fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> {
pub fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> {
#delete_logic
core::result::Result::Ok(())
}
Expand All @@ -87,9 +87,10 @@ impl Generator {
#pk_ident,
#secondary_events_ident
> = Operation::Delete(DeleteOperation {
id: Default::default(),
id: uuid::Uuid::now_v7().into(),
secondary_keys_events,
primary_key_events,
link,
});
self.2.apply_operation(op);
}
Expand Down
2 changes: 1 addition & 1 deletion codegen/src/worktable/generator/queries/type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Generator {

if !rows.is_empty() {
Ok(quote! {
#[derive(Clone, Debug, derive_more::Display, From, PartialEq)]
#[derive(Clone, Debug, From, PartialEq)]
#[non_exhaustive]
pub enum #avt_type_ident {
#(#rows)*
Expand Down
12 changes: 8 additions & 4 deletions codegen/src/worktable/generator/queries/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Generator {
} else {
quote! {
if bytes.len() >= link.length as usize {
self.delete_without_lock(pk.clone()).await?;
self.delete_without_lock(pk.clone())?;
self.insert(row)?;

lock.unlock(); // Releases locks
Expand Down Expand Up @@ -94,6 +94,7 @@ impl Generator {

let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#row_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() };

let op_id = OperationId::Single(uuid::Uuid::now_v7());
#diff_process
#persist_op

Expand Down Expand Up @@ -243,7 +244,7 @@ impl Generator {
if need_to_reinsert {
let mut row_old = self.select(pk.clone()).unwrap();
#(#row_updates)*
self.delete_without_lock(pk.clone()).await?;
self.delete_without_lock(pk.clone())?;
self.insert(row_old)?;

lock.unlock(); // Releases locks
Expand All @@ -269,7 +270,7 @@ impl Generator {
#primary_key_ident,
#secondary_events_ident
> = Operation::Update(UpdateOperation {
id: Default::default(),
id: op_id,
secondary_keys_events,
bytes: updated_bytes,
link,
Expand Down Expand Up @@ -406,6 +407,7 @@ impl Generator {
.map(|v| v.get().value)
.ok_or(WorkTableError::NotFound)?;

let op_id = OperationId::Single(uuid::Uuid::now_v7());
#size_check
#diff_process
#persist_op
Expand Down Expand Up @@ -486,7 +488,7 @@ impl Generator {
if need_to_reinsert {
let mut row_old = self.select(pk.clone()).unwrap();
#(#row_updates)*
self.delete_without_lock(pk.clone()).await?;
self.delete_without_lock(pk.clone())?;
self.insert(row_old)?;

let lock = self.0.lock_map.get(&pk).expect("was inserted before and not deleted");
Expand Down Expand Up @@ -532,6 +534,7 @@ impl Generator {
}

let mut links_to_unlock = vec![];
let op_id = OperationId::Multi(uuid::Uuid::now_v7());
for link in links.into_iter() {
let pk = self.0.data.select(link)?.get_primary_key().clone();
let mut bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&row)
Expand Down Expand Up @@ -638,6 +641,7 @@ impl Generator {
let lock = std::sync::Arc::new(lock);
self.0.lock_map.insert(pk.clone(), lock.clone());

let op_id = OperationId::Single(uuid::Uuid::now_v7());
#size_check
#diff_process
#persist_op
Expand Down
30 changes: 26 additions & 4 deletions codegen/src/worktable/generator/table/impls.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use proc_macro2::TokenStream;
use quote::quote;

use crate::name_generator::WorktableNameGenerator;
use crate::name_generator::{is_unsized_vec, WorktableNameGenerator};
use crate::worktable::generator::Generator;
use crate::worktable::model::GeneratorType;

Expand Down Expand Up @@ -47,11 +47,33 @@ impl Generator {
let const_name = name_generator.get_page_inner_size_const_ident();

if self.is_persist {
let pk_types = &self
.columns
.primary_keys
.iter()
.map(|i| {
self.columns
.columns_map
.get(i)
.expect("should exist as got from definition")
.to_string()
})
.collect::<Vec<_>>();
let pk_types_unsized = is_unsized_vec(pk_types);
let index_size = if pk_types_unsized {
quote! {
let size = #const_name;
}
} else {
quote! {
let size = get_index_page_size_from_data_length::<#pk_type>(#const_name);
}
};
quote! {
pub async fn new(config: PersistenceConfig) -> eyre::Result<Self> {
let mut inner = WorkTable::default();
inner.table_name = #table_name;
let size = get_index_page_size_from_data_length::<#pk_type>(#const_name);
#index_size
inner.pk_map = IndexMap::with_maximum_node_size(size);
let table_files_path = format!("{}/{}", config.tables_path, #dir_name);
let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path).await?;
Expand Down Expand Up @@ -222,9 +244,9 @@ impl Generator {

fn gen_table_count_fn(&self) -> TokenStream {
quote! {
pub fn count(&self) -> Option<usize> {
pub fn count(&self) -> usize {
let count = self.0.pk_map.len();
(count > 0).then_some(count)
count
}
}
}
Expand Down
Loading
Loading