Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.9.0-alpha8"
version = "0.9.0-beta0.1.1"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -19,15 +19,15 @@ s3-support = ["dep:rusty-s3", "dep:url", "dep:reqwest", "dep:walkdir", "worktabl
[dependencies]
async-trait = "0.1.89"
convert_case = "0.6.0"
data_bucket = "=0.3.12"
data_bucket = "=0.3.13"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" }
# data_bucket = { path = "../DataBucket", version = "0.3.11" }
derive_more = { version = "2.0.1", features = ["from", "error", "display", "debug", "into"] }
eyre = "0.6.12"
fastrand = "2.3.0"
futures = "0.3.30"
indexset = { version = "=0.14.0", features = ["concurrent", "cdc", "multimap"] }
# indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.10", features = ["concurrent", "cdc", "multimap"] }
indexset = { version = "=0.16.0", features = ["concurrent", "cdc", "multimap"] }
# indexset = { path = "../indexset", version = "0.15.0", features = ["concurrent", "cdc", "multimap"] }
# indexset = { package = "wt-indexset", version = "=0.12.12", features = ["concurrent", "cdc", "multimap"] }
lockfree = { version = "0.5.1" }
log = "0.4.29"
Expand All @@ -39,14 +39,14 @@ prettytable-rs = "^0.10"
psc-nanoid = { version = "3.1.1", features = ["rkyv", "packed"] }
rkyv = { version = "0.8.9", features = ["uuid-1"] }
reqwest = { version = "0.12", optional = true, default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2"] }
rusty-s3 = { version = "0.9.0", optional = true }
rusty-s3 = { package = "rusty-s3-temp", version = "0.9.0", optional = true }
smart-default = "0.7.1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
url = { version = "2", optional = true }
uuid = { version = "1.10.0", features = ["v4", "v7"] }
walkdir = { version = "2", optional = true }
worktable_codegen = { path = "codegen", version = "=0.9.0-alpha4" }
worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.0" }

[dev-dependencies]
chrono = "0.4.43"
Expand Down
5 changes: 3 additions & 2 deletions codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.9.0-alpha4"
version = "0.9.0-beta0.1.0"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand All @@ -18,4 +18,5 @@ rkyv = { version = "0.7.45" }
syn = { version = "2.0.74", features = ["full"] }
quote = "1.0.36"
proc-macro2 = "1.0.86"
convert_case = "0.6.0"
convert_case = "0.6.0"
indexmap = "2"
2 changes: 0 additions & 2 deletions codegen/src/persist_table/generator/space_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ impl Generator {
C: Clone + PersistenceConfig,
{
let mut page_id = 1;
println!("{:?}", self.data_info.inner);

let data = self.data.into_iter().map(|p| {
let mut data = Data::from_data_page(p);
data.set_page_id(page_id.into());
Expand Down
151 changes: 113 additions & 38 deletions codegen/src/worktable/generator/index/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ impl Generator {
let save_row_cdc = self.gen_save_row_cdc_index_fn();
let reinsert_row_cdc = self.gen_reinsert_row_cdc_index_fn();
let delete_row_cdc = self.gen_delete_row_cdc_index_fn();
let delete_from_indexes_cdc = self.gen_delete_from_indexes_cdc_index_fn();
let process_difference_insert_cdc = self.gen_process_difference_insert_cdc_index_fn();
let process_difference_remove_cdc = self.gen_process_difference_remove_cdc_index_fn();

Expand All @@ -25,6 +26,7 @@ impl Generator {
#reinsert_row_cdc
#save_row_cdc
#delete_row_cdc
#delete_from_indexes_cdc
#process_difference_insert_cdc
#process_difference_remove_cdc
}
Expand All @@ -50,13 +52,16 @@ impl Generator {
let index_variant: TokenStream = camel_case_name.parse().unwrap();

quote! {
partial_events.#index_field_name = vec![];
let #index_field_name = if let Some(events) = self.#index_field_name.insert_checked_cdc(row.#i.clone(), link) {
events.into_iter().map(|ev| ev.into()).collect()
let evs: Vec<_> = events.into_iter().map(|ev| ev.into()).collect();
partial_events.#index_field_name = evs.clone();
evs
} else {
return Err(IndexError::AlreadyExists {
return (partial_events, Err(IndexError::AlreadyExists {
at: #available_index_ident::#index_variant,
inserted_already: inserted_indexes.clone(),
});
}));
};
inserted_indexes.push(#available_index_ident::#index_variant);
}
Expand All @@ -70,15 +75,14 @@ impl Generator {
.collect::<Vec<_>>();

quote! {
fn save_row_cdc(&self, row: #row_type_ident, link: Link) -> Result<#events_ident, IndexError<#available_index_ident>> {
fn save_row_cdc(&self, row: #row_type_ident, link: Link) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
let mut inserted_indexes: Vec<#available_index_ident> = vec![];
let mut partial_events = #events_ident::default();

#(#save_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
}
)
(#events_ident {
#(#idents,)*
}, Ok(()))
}
}
}
Expand Down Expand Up @@ -122,13 +126,16 @@ impl Generator {
let insert = if idx.is_unique {
quote! {
let mut #index_field_name = if row_new.#i != row_old.#i {
partial_events.#index_field_name = vec![];
let #index_field_name: Vec<_> = if let Some(events) = self.#index_field_name.insert_checked_cdc(row_new.#i.clone(), link_new) {
events.into_iter().map(|ev| ev.into()).collect()
let evs: Vec<_> = events.into_iter().map(|ev| ev.into()).collect();
partial_events.#index_field_name = evs.clone();
evs
} else {
return Err(IndexError::AlreadyExists {
return (partial_events, Err(IndexError::AlreadyExists {
at: #available_index_ident::#index_variant,
inserted_already: inserted_indexes.clone(),
});
}));
};
inserted_indexes.push(#available_index_ident::#index_variant);

Expand Down Expand Up @@ -159,16 +166,15 @@ impl Generator {
link_old: Link,
row_new: #row_type_ident,
link_new: Link
) -> Result<#events_ident, IndexError<#available_index_ident>> {
) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
let mut inserted_indexes: Vec<#available_index_ident> = vec![];
let mut partial_events = #events_ident::default();

#(#insert_rows)*
#(#remove_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
}
)
(#events_ident {
#(#idents,)*
}, Ok(()))
}
}
}
Expand Down Expand Up @@ -199,13 +205,82 @@ impl Generator {
.collect::<Vec<_>>();

quote! {
fn delete_row_cdc(&self, row: #row_type_ident, link: Link) -> Result<#events_ident, IndexError<#available_index_ident>> {
fn delete_row_cdc(&self, row: #row_type_ident, link: Link) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
#(#delete_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
(#events_ident {
#(#idents,)*
}, Ok(()))
}
}
}

fn gen_delete_from_indexes_cdc_index_fn(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_type_ident = name_generator.get_row_type_ident();
let events_ident = name_generator.get_space_secondary_index_events_ident();
let available_index_ident = name_generator.get_available_indexes_ident();

let matches = self
.columns
.indexes
.iter()
.map(|(i, idx)| {
let index_field_name = &idx.name;
let camel_case_name = index_field_name
.to_string()
.from_case(Case::Snake)
.to_case(Case::Pascal);
let index_variant: TokenStream = camel_case_name.parse().unwrap();
let type_str = self.columns
.columns_map
.get(i)
.unwrap()
.to_string();
let row = if is_float(type_str.as_str()) {
quote! {
OrderedFloat(row.#i)
}
)
} else if type_str == "String" {
quote! {
row.#i.clone()
}
} else {
quote! {
row.#i
}
};

quote! {
#available_index_ident::#index_variant => {
let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, #row, link);
partial_events.#index_field_name = events.into_iter().map(|ev| ev.into()).collect();
},
}
})
.collect::<Vec<_>>();

let inner = if matches.is_empty() {
quote! {}
} else {
quote! {
for index in indexes {
match index {
#(#matches)*
}
}
}
};

quote! {
fn delete_from_indexes_cdc(
&self,
row: #row_type_ident,
link: Link,
indexes: Vec<#available_index_ident>,
) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
let mut partial_events = #events_ident::default();
#inner
(partial_events, Ok(()))
}
}
}
Expand Down Expand Up @@ -261,13 +336,11 @@ impl Generator {
&self,
link: Link,
difference: std::collections::HashMap<&str, Difference<#avt_type_ident>>
) -> Result<#events_ident, IndexError<#available_index_ident>> {
) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
#(#process_difference_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
}
)
(#events_ident {
#(#idents,)*
}, Ok(()))
}
}
}
Expand Down Expand Up @@ -304,13 +377,16 @@ impl Generator {
let mut events = vec![];
if let #avt_type_ident::#variant_ident(new) = &diff.new {
let key_new = #new_value_expr;
if let Some(evs) = TableIndexCdc::insert_checked_cdc(&self.#index_field_name, key_new, link) {
partial_events.#index_field_name = vec![];
if let Some(evs) = TableIndexCdc::insert_checked_cdc(&self.#index_field_name, key_new, link) {
let evs: Vec<_> = evs.into_iter().collect();
partial_events.#index_field_name = evs.clone();
events.extend_from_slice(evs.as_ref());
} else {
return Err(IndexError::AlreadyExists {
return (partial_events, Err(IndexError::AlreadyExists {
at: #available_index_ident::#index_variant,
inserted_already: inserted_indexes.clone(),
});
}));
}
inserted_indexes.push(#available_index_ident::#index_variant);
}
Expand All @@ -335,15 +411,14 @@ impl Generator {
&self,
link: Link,
difference: std::collections::HashMap<&str, Difference<#avt_type_ident>>
) -> Result<#events_ident, IndexError<#available_index_ident>> {
) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) {
let mut inserted_indexes: Vec<#available_index_ident> = vec![];
let mut partial_events = #events_ident::default();

#(#process_difference_insert_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
}
)
(#events_ident {
#(#idents,)*
}, Ok(()))
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion codegen/src/worktable/generator/queries/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ impl Generator {

let process = if self.is_persist {
quote! {
let secondary_keys_events = self.0.indexes.delete_row_cdc(row, link)?;
let (secondary_keys_events, res) = self.0.indexes.delete_row_cdc(row, link);
res?;
let (_, primary_key_events) = self.0.primary_index.remove_cdc(pk.clone(), link);
self.0.data.delete(link).map_err(WorkTableError::PagesError)?;
let mut op: Operation<
Expand Down
30 changes: 24 additions & 6 deletions codegen/src/worktable/generator/queries/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,28 +351,45 @@ impl Generator {
};

let process_difference = if self.is_persist {
let secondary_events_ident = name_generator.get_space_secondary_index_events_ident();
if idx_idents.is_some() {
quote! {
let indexes_res = self.0.indexes.process_difference_insert_cdc(link, diffs.clone());
let (secondary_events, indexes_res): (#secondary_events_ident, _) = self.0.indexes.process_difference_insert_cdc(link, diffs.clone());
if let Err(e) = indexes_res {
return match e {
IndexError::AlreadyExists {
at,
inserted_already,
} => {
self.0.indexes
.delete_from_indexes(row_new.merge(row_old.clone()), link, inserted_already)?;
// Generate rollback CDC events for secondary indexes
let (rollback_secondary_events, _): (#secondary_events_ident, _) = self.0.indexes.delete_from_indexes_cdc(
row_new.merge(row_old.clone()),
link,
inserted_already
);

// Merge original partial insert events with rollback events
let mut merged_events = secondary_events.clone();
merged_events.extend(rollback_secondary_events);

// Create AcknowledgeOperation with all events
let ack_op = Operation::Acknowledge(AcknowledgeOperation {
id: OperationId::Single(uuid::Uuid::now_v7()),
primary_key_events: vec![], // Updates don't modify primary key
secondary_keys_events: merged_events,
});
self.1.apply_operation(ack_op);

Err(WorkTableError::AlreadyExists(at.to_string_value()))
}
IndexError::NotFound => Err(WorkTableError::NotFound),
};
}
let mut secondary_keys_events = indexes_res.expect("was just checked for correctness");
let mut secondary_keys_events = secondary_events;
}
} else {
quote! {
let secondary_keys_events = core::default::Default::default();
let secondary_keys_events: #secondary_events_ident = core::default::Default::default();
}
}
} else if idx_idents.is_some() {
Expand Down Expand Up @@ -408,7 +425,8 @@ impl Generator {
let process_difference = if self.is_persist {
if idx_idents.is_some() {
quote! {
let secondary_keys_events_remove = self.0.indexes.process_difference_remove_cdc(link, diffs)?;
let (secondary_keys_events_remove, res) = self.0.indexes.process_difference_remove_cdc(link, diffs);
res?;
op.extend_secondary_key_events(secondary_keys_events_remove);
}
} else {
Expand Down
Loading
Loading