Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.6.10"
version = "0.6.11"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -22,18 +22,18 @@ tokio = { version = "1", features = ["full"] }
tracing = "0.1"
rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.6.9" }
worktable_codegen = { path = "codegen", version = "0.6.11" }
fastrand = "2.3.0"
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4", "v7"] }
data_bucket = "0.2.8"
data_bucket = "0.2.9"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" }
# data_bucket = { path = "../DataBucket", version = "0.2.7" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
# indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] }
# indexset = { path = "../indexset", version = "0.12.3", features = ["concurrent", "cdc", "multimap"] }
indexset = { package = "wt-indexset", version = "0.12.4", features = ["concurrent", "cdc", "multimap"] }
indexset = { package = "wt-indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] }
convert_case = "0.6.0"
ordered-float = "5.0.0"
parking_lot = "0.12.3"
Expand Down
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.6.9"
version = "0.6.11"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand Down
2 changes: 1 addition & 1 deletion codegen/src/persist_table/generator/space_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Generator {
pk_map,
indexes,
pk_gen: PrimaryKeyGeneratorState::from_state(self.data_info.inner.pk_gen_state),
lock_map: LockMap::new(),
lock_map: LockMap::default(),
table_name: "",
pk_phantom: std::marker::PhantomData,
};
Expand Down
42 changes: 42 additions & 0 deletions codegen/src/worktable/generator/index/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ impl Generator {
let available_index_ident = name_generator.get_available_indexes_ident();

let save_row_cdc = self.gen_save_row_cdc_index_fn();
let reinsert_row_cdc = self.gen_reinsert_row_cdc_index_fn();
let delete_row_cdc = self.gen_delete_row_cdc_index_fn();
let process_diff_cdc = self.gen_process_diff_cdc_index_fn();

quote! {
impl TableSecondaryIndexCdc<#row_type_ident, #available_types_ident, #events_ident, #available_index_ident> for #index_type_ident {
#reinsert_row_cdc
#save_row_cdc
#delete_row_cdc

Expand Down Expand Up @@ -80,6 +82,46 @@ impl Generator {
}
}

fn gen_reinsert_row_cdc_index_fn(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_type_ident = name_generator.get_row_type_ident();
let events_ident = name_generator.get_space_secondary_index_events_ident();

let reinsert_rows = self
.columns
.indexes
.iter()
.map(|(i, idx)| {
let index_field_name = &idx.name;
quote! {
let (_, events) = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new);
let mut #index_field_name: Vec<_> = events.into_iter().map(|ev| ev.into()).collect();
if row_new.#i != row_old.#i {
let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old);
#index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::<Vec<_>>());
}
}
})
.collect::<Vec<_>>();
let idents = self
.columns
.indexes
.values()
.map(|idx| &idx.name)
.collect::<Vec<_>>();

quote! {
fn reinsert_row_cdc(&self, row_old: #row_type_ident, link_old: Link, row_new: #row_type_ident, link_new: Link) -> eyre::Result<#events_ident> {
#(#reinsert_rows)*
core::result::Result::Ok(
#events_ident {
#(#idents,)*
}
)
}
}
}

fn gen_delete_row_cdc_index_fn(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_type_ident = name_generator.get_row_type_ident();
Expand Down
50 changes: 50 additions & 0 deletions codegen/src/worktable/generator/index/usual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ impl Generator {
let avt_index_ident = name_generator.get_available_indexes_ident();

let save_row_fn = self.gen_save_row_index_fn();
let reinsert_row_fn = self.gen_reinsert_row_index_fn();
let delete_row_fn = self.gen_delete_row_index_fn();
let process_difference_fn = self.gen_process_difference_index_fn();
let delete_from_indexes = self.gen_index_delete_from_indexes_fn();

quote! {
impl TableSecondaryIndex<#row_type_ident, #avt_type_ident, #avt_index_ident> for #index_type_ident {
#save_row_fn
#reinsert_row_fn
#delete_row_fn
#process_difference_fn
#delete_from_indexes
Expand Down Expand Up @@ -87,6 +89,54 @@ impl Generator {
}
}

fn gen_reinsert_row_index_fn(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_type_ident = name_generator.get_row_type_ident();

let reinsert_rows = self
.columns
.indexes
.iter()
.map(|(i, idx)| {
let index_field_name = &idx.name;
let row = if is_float(
self.columns
.columns_map
.get(i)
.unwrap()
.to_string()
.as_str(),
) {
quote! {
OrderedFloat(row.#i)
}
} else {
quote! {
row.#i
}
};
quote! {
let row = &row_new;
let val_new = #row.clone();
self.#index_field_name.insert(val_new.clone(), link_new);

let row = &row_old;
let val_old = #row.clone();
if val_new != val_old {
TableIndex::remove(&self.#index_field_name, val_old, link_old);
}
}
})
.collect::<Vec<_>>();

quote! {
fn reinsert_row(&self, row_old: #row_type_ident, link_old: Link, row_new: #row_type_ident, link_new: Link) -> eyre::Result<()> {
#(#reinsert_rows)*
core::result::Result::Ok(())
}
}
}

/// Generates `delete_row` function of `TableIndex` trait for index. It removes `Link` from all secondary indexes.
/// Logic varies on index uniqueness. For unique index we can just delete `Link` from index, but for non-unique we
/// need to get set from index first and then delete `Link` from set.
Expand Down
116 changes: 76 additions & 40 deletions codegen/src/worktable/generator/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ impl Generator {
quote! {
#type_
#impl_

}
}

Expand All @@ -32,8 +31,6 @@ impl Generator {
quote! {
#[derive(Debug, Clone)]
pub struct #lock_ident {
id: u16,
lock: Option<std::sync::Arc<Lock>>,
#(#rows)*
}
}
Expand All @@ -44,16 +41,50 @@ impl Generator {
let lock_ident = name_generator.get_lock_type_ident();

let new_fn = self.gen_new_fn();
let with_lock_fn = self.gen_with_lock_fn();
let lock_await_fn = self.gen_lock_await_fn();
let unlock_fn = self.gen_unlock_fn();
let row_impl = self.gen_lock_row_impl();

quote! {
impl #lock_ident {
#new_fn
}

#row_impl
}
}

fn gen_lock_row_impl(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let lock_ident = name_generator.get_lock_type_ident();

let is_locked_fn = self.gen_is_locked_fn();
let with_lock_fn = self.gen_with_lock_fn();
let lock_fn = self.gen_lock_fn();
let merge_fn = self.gen_merge_fn();

quote! {
impl RowLock for #lock_ident {
#is_locked_fn
#lock_fn
#with_lock_fn
#lock_await_fn
#unlock_fn
#merge_fn
}
}
}

fn gen_is_locked_fn(&self) -> TokenStream {
let rows: Vec<_> = self
.columns
.columns_map
.keys()
.map(|i| {
let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site());
quote! { self.#col.as_ref().map(|l| l.is_locked()).unwrap_or(false) }
})
.collect();

quote! {
fn is_locked(&self) -> bool {
#(#rows) ||*
}
}
}
Expand All @@ -70,10 +101,8 @@ impl Generator {
.collect();

quote! {
pub fn new(lock_id: u16) -> Self {
pub fn new() -> Self {
Self {
id: lock_id,
lock: None,
#(#rows),*
}
}
Expand All @@ -87,71 +116,78 @@ impl Generator {
.keys()
.map(|i| {
let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site());
quote! { #col: Some(std::sync::Arc::new(Lock::new())) }
quote! { #col: Some(lock.clone()) }
})
.collect();

quote! {
pub fn with_lock(lock_id: u16) -> Self {
Self {
id: lock_id,
lock: Some(std::sync::Arc::new(Lock::new())),
#(#rows),*
}
fn with_lock(id: u16) -> (Self, std::sync::Arc<Lock>) {
let lock = std::sync::Arc::new(Lock::new(id));
(
Self {
#(#rows),*
},
lock
)
}
}
}

fn gen_lock_await_fn(&self) -> TokenStream {
fn gen_lock_fn(&self) -> TokenStream {
let rows: Vec<_> = self
.columns
.columns_map
.keys()
.map(|col| {
let col = Ident::new(format!("{col}_lock").as_str(), Span::mixed_site());
.map(|i| {
let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site());
quote! {
if let Some(lock) = &self.#col {
futures.push(lock.as_ref());
}
if let Some(lock) = &self.#col {
set.insert(lock.clone());
}
self.#col = Some(lock.clone());
}
})
.collect();
quote! {
pub async fn lock_await(&self) {
let mut futures = Vec::new();

if let Some(lock) = &self.lock {
futures.push(lock.as_ref());
}
quote! {
#[allow(clippy::mutable_key_type)]
fn lock(&mut self, id: u16) -> (std::collections::HashSet<std::sync::Arc<Lock>>, std::sync::Arc<Lock>) {
let mut set = std::collections::HashSet::new();
let lock = std::sync::Arc::new(Lock::new(id));
#(#rows)*
futures::future::join_all(futures).await;

(set, lock)
}
}
}

fn gen_unlock_fn(&self) -> TokenStream {
fn gen_merge_fn(&self) -> TokenStream {
let rows: Vec<_> = self
.columns
.columns_map
.keys()
.map(|col| {
let col = Ident::new(format!("{col}_lock").as_str(), Span::mixed_site());
quote! {
if let Some(#col) = &self.#col {
#col.unlock();
}
if let Some(#col) = &other.#col {
if self.#col.is_none() {
self.#col = Some(#col.clone());
} else {
set.insert(#col.clone());
}
}
other.#col = self.#col.clone();
}
})
.collect();

quote! {
pub fn unlock(&self) {
if let Some(lock) = &self.lock {
lock.unlock();
}
#[allow(clippy::mutable_key_type)]
fn merge(&mut self, other: &mut Self) -> std::collections::HashSet<std::sync::Arc<Lock>> {
let mut set = std::collections::HashSet::new();
#(#rows)*
set
}

}
}
}
Loading
Loading