From 1b51eb2a21971f7a94fa15990c23a2ab9da9c683 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Wed, 19 Feb 2025 06:49:40 +0300 Subject: [PATCH 01/11] Wip --- src/lock/set.rs | 25 ++++++++++++++++--------- src/table/mod.rs | 32 ++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/lock/set.rs b/src/lock/set.rs index 0022a43a..d74116d5 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -3,22 +3,29 @@ use std::sync::Arc; use lockfree::map::Map; -use crate::lock::{Lock, LockId}; - #[derive(Debug)] -pub struct LockMap { - set: Map>, +pub struct LockMap +where + PrimaryKey: std::hash::Hash + std::cmp::Ord, +{ + set: Map>, next_id: AtomicU16, } -impl Default for LockMap { +impl Default for LockMap +where + PrimaryKey: std::hash::Hash + std::cmp::Ord, +{ fn default() -> Self { Self::new() } } -impl LockMap { +impl LockMap +where + PrimaryKey: std::hash::Hash + std::cmp::Ord, +{ pub fn new() -> Self { Self { set: Map::new(), @@ -26,15 +33,15 @@ impl LockMap { } } - pub fn insert(&self, id: LockId, lock: Arc) { + pub fn insert(&self, id: PrimaryKey, lock: Arc) { self.set.insert(id, lock); } - pub fn get(&self, id: &LockId) -> Option> { + pub fn get(&self, id: &PrimaryKey) -> Option> { self.set.get(id).map(|v| v.val().clone()) } - pub fn remove(&self, id: &LockId) { + pub fn remove(&self, id: &PrimaryKey) { self.set.remove(id); } diff --git a/src/table/mod.rs b/src/table/mod.rs index 3075310a..6684b19d 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -25,10 +25,11 @@ pub struct WorkTable< PrimaryKey, AvailableTypes = (), SecondaryIndexes = (), + LockType = (), PkGen = ::Generator, const DATA_LENGTH: usize = INNER_PAGE_SIZE, > where - PrimaryKey: Clone + Ord + Send + 'static, + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, Row: StorableRow, { pub data: DataPages, @@ -39,7 +40,7 @@ pub struct WorkTable< pub pk_gen: PkGen, - pub lock_map: LockMap, + pub lock_map: LockMap, pub table_name: &'static str, @@ -49,10 +50,18 @@ pub struct WorkTable< } // Manual implementations to avoid unneeded trait bounds. -impl Default - for WorkTable +impl< + Row, + PrimaryKey, + AvailableTypes, + SecondaryIndexes, + LockType, + PkGen, + const DATA_LENGTH: usize, + > Default + for WorkTable where - PrimaryKey: Clone + Ord + Send + TablePrimaryKey, + PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, SecondaryIndexes: Default, PkGen: Default, Row: StorableRow, @@ -72,11 +81,18 @@ where } } -impl - WorkTable +impl< + Row, + PrimaryKey, + AvailableTypes, + SecondaryIndexes, + LockType, + PkGen, + const DATA_LENGTH: usize, + > WorkTable where Row: TableRow, - PrimaryKey: Clone + Ord + Send + TablePrimaryKey, + PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, Row: StorableRow, ::WrappedRow: RowWrapper, { From 619016b0a65e5e9dc368756d22651e4214f4b687 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Wed, 19 Feb 2025 09:22:36 +0300 Subject: [PATCH 02/11] WIP lockmap --- codegen/src/name_generator.rs | 4 ++ .../src/worktable/generator/primary_key.rs | 3 +- codegen/src/worktable/generator/table/mod.rs | 5 +- codegen/src/worktable/generator/wrapper.rs | 55 +++++++++++++++++++ codegen/src/worktable/mod.rs | 2 + src/in_memory/mod.rs | 2 +- src/in_memory/row.rs | 10 ++++ src/lib.rs | 2 +- src/lock/mod.rs | 2 +- src/lock/set.rs | 12 +--- src/table/mod.rs | 5 +- 11 files changed, 85 insertions(+), 17 deletions(-) diff --git a/codegen/src/name_generator.rs b/codegen/src/name_generator.rs index cdc3fe96..52042731 100644 --- a/codegen/src/name_generator.rs +++ b/codegen/src/name_generator.rs @@ -54,6 +54,10 @@ impl WorktableNameGenerator { Ident::new(format!("{}Wrapper", self.name).as_str(), Span::mixed_site()) } + pub fn get_lock_type_ident(&self) -> Ident { + Ident::new(format!("{}Lock", self.name).as_str(), Span::mixed_site()) + } + pub fn get_index_type_ident(&self) -> Ident { Ident::new(format!("{}Index", self.name).as_str(), Span::mixed_site()) } diff --git a/codegen/src/worktable/generator/primary_key.rs b/codegen/src/worktable/generator/primary_key.rs index 348590c7..e951a62d 100644 --- a/codegen/src/worktable/generator/primary_key.rs +++ b/codegen/src/worktable/generator/primary_key.rs @@ -69,7 +69,8 @@ impl Generator { Into, PartialEq, PartialOrd, - Ord + Hash, + Ord, )] pub struct #ident(#(#types),*); } diff --git a/codegen/src/worktable/generator/table/mod.rs b/codegen/src/worktable/generator/table/mod.rs index e5dba507..54861711 100644 --- a/codegen/src/worktable/generator/table/mod.rs +++ b/codegen/src/worktable/generator/table/mod.rs @@ -57,6 +57,7 @@ impl Generator { let index_type = name_generator.get_index_type_ident(); let inner_const_name = name_generator.get_page_inner_size_const_ident(); let avt_type_ident = name_generator.get_available_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); let derive = if self.is_persist { quote! { @@ -84,6 +85,7 @@ impl Generator { #primary_key_type, #avt_type_ident, #index_type, + #lock_ident, <#primary_key_type as TablePrimaryKey>::Generator, #inner_const_name > @@ -98,7 +100,8 @@ impl Generator { #row_type, #primary_key_type, #avt_type_ident, - #index_type + #index_type, + #lock_ident, > #persist_type_part ); diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index 0d10670c..1c5f9a56 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -10,6 +10,9 @@ impl Generator { let archived_impl = self.get_wrapper_archived_impl(); let storable_impl = self.get_wrapper_storable_impl(); + println!("!TYPE {}", type_); + println!("!Archived {}", archived_impl); + quote! { #type_ #impl_ @@ -22,6 +25,7 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_ident = name_generator.get_row_type_ident(); let wrapper_ident = name_generator.get_wrapper_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); let row_locks = self .columns @@ -34,6 +38,18 @@ impl Generator { } }) .collect::>(); + + let row_locks2 = self + .columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + #name: Option>, + } + }) + .collect::>(); quote! { #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] #[repr(C)] @@ -43,6 +59,11 @@ impl Generator { lock: u16, #(#row_locks)* } + #[derive(Debug)] + pub struct #lock_ident { + lock: Option>, + #(#row_locks2)* + } } } @@ -64,6 +85,7 @@ impl Generator { .collect::>(); quote! { + impl RowWrapper<#row_ident> for #wrapper_ident { fn get_inner(self) -> #row_ident { self.inner @@ -84,6 +106,8 @@ impl Generator { fn get_wrapper_archived_impl(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let wrapper_ident = name_generator.get_wrapper_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); + let archived_wrapper_ident = Ident::new( format!("Archived{}", &wrapper_ident).as_str(), Span::mixed_site(), @@ -102,7 +126,38 @@ impl Generator { }) .collect::>(); + let checks2 = self + .columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + if let Some(#name) = &self.#name { + if #name.locked.load(std::sync::atomic::Ordering::Acquire) { + return true; + } + } + } + }) + .collect::>(); + quote! { + + impl Lockable for #lock_ident { + fn is_locked(&self) -> bool { + if let Some(lock) = &self.lock { + if lock.locked.load(std::sync::atomic::Ordering::Acquire) { + return true; + } + } + + #(#checks2)* + + false + } + } + impl ArchivedRow for #archived_wrapper_ident { fn is_locked(&self) -> Option { if self.lock != 0 { diff --git a/codegen/src/worktable/mod.rs b/codegen/src/worktable/mod.rs index e67c1f0c..e42fb60c 100644 --- a/codegen/src/worktable/mod.rs +++ b/codegen/src/worktable/mod.rs @@ -59,6 +59,8 @@ pub fn expand(input: TokenStream) -> syn::Result { let update_impls = generator.gen_query_update_impl()?; let delete_impls = generator.gen_query_delete_impl()?; + println!("{}", table_def); + Ok(quote! { #pk_def #row_def diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index ab18d8c8..2a49b519 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -4,4 +4,4 @@ mod row; pub use data::{Data, ExecutionError as DataExecutionError, DATA_INNER_LENGTH}; pub use pages::{DataPages, ExecutionError as PagesExecutionError}; -pub use row::{ArchivedRow, RowWrapper, StorableRow}; +pub use row::{ArchivedRow, Lockable, RowWrapper, StorableRow}; diff --git a/src/in_memory/row.rs b/src/in_memory/row.rs index ec6a1e57..154c03aa 100644 --- a/src/in_memory/row.rs +++ b/src/in_memory/row.rs @@ -21,6 +21,16 @@ pub trait ArchivedRow { fn is_locked(&self) -> Option; } +pub trait Lockable { + fn is_locked(&self) -> bool; +} + +impl Lockable for () { + fn is_locked(&self) -> bool { + false + } +} + /// General `Row` wrapper that is used to append general data for every `Inner` /// `Row`. #[derive(Archive, Deserialize, Debug, Serialize)] diff --git a/src/lib.rs b/src/lib.rs index 725ada04..3e4cd634 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub use worktable_codegen::worktable; pub mod prelude { pub use crate::database::DatabaseManager; - pub use crate::in_memory::{ArchivedRow, Data, DataPages, RowWrapper, StorableRow}; + pub use crate::in_memory::{ArchivedRow, Data, DataPages, Lockable, RowWrapper, StorableRow}; pub use crate::lock::LockMap; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{ diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 51b26194..ad0ef0ce 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -18,7 +18,7 @@ pub struct LockId(u16); #[derive(Debug)] pub struct Lock { - locked: AtomicBool, + pub locked: AtomicBool, waker: AtomicWaker, } diff --git a/src/lock/set.rs b/src/lock/set.rs index d74116d5..fefd0eba 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -1,4 +1,3 @@ -use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use lockfree::map::Map; @@ -9,8 +8,6 @@ where PrimaryKey: std::hash::Hash + std::cmp::Ord, { set: Map>, - - next_id: AtomicU16, } impl Default for LockMap @@ -27,10 +24,7 @@ where PrimaryKey: std::hash::Hash + std::cmp::Ord, { pub fn new() -> Self { - Self { - set: Map::new(), - next_id: AtomicU16::default(), - } + Self { set: Map::new() } } pub fn insert(&self, id: PrimaryKey, lock: Arc) { @@ -44,8 +38,4 @@ where pub fn remove(&self, id: &PrimaryKey) { self.set.remove(id); } - - pub fn next_id(&self) -> u16 { - self.next_id.fetch_add(1, Ordering::Relaxed) - } } diff --git a/src/table/mod.rs b/src/table/mod.rs index 6684b19d..008d2c14 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -14,7 +14,7 @@ use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; -use crate::in_memory::{DataPages, RowWrapper, StorableRow}; +use crate::in_memory::{DataPages, Lockable, RowWrapper, StorableRow}; use crate::lock::LockMap; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; use crate::{in_memory, IndexMap, TableRow, TableSecondaryIndex}; @@ -63,6 +63,7 @@ impl< where PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, SecondaryIndexes: Default, + LockType: Lockable, PkGen: Default, Row: StorableRow, ::WrappedRow: RowWrapper, @@ -110,6 +111,7 @@ where )] pub fn select(&self, pk: PrimaryKey) -> Option where + LockType: 'static, Row: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, @@ -139,6 +141,7 @@ where PrimaryKey: Clone, AvailableTypes: 'static, SecondaryIndexes: TableSecondaryIndex, + LockType: 'static, { let pk = row.get_primary_key().clone(); let link = self From 0887d7eee4f266dd87c6d77d0c5dc0b346a0d1d1 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Wed, 19 Feb 2025 13:38:03 +0300 Subject: [PATCH 03/11] WIP Locks outside wrapper --- .../src/worktable/generator/queries/delete.rs | 21 +-- .../src/worktable/generator/queries/locks.rs | 100 ++--------- .../src/worktable/generator/queries/update.rs | 49 +++--- codegen/src/worktable/generator/row.rs | 2 + codegen/src/worktable/generator/wrapper.rs | 166 +++++++++--------- codegen/src/worktable/mod.rs | 3 +- examples/src/main.rs | 65 ++++--- src/in_memory/mod.rs | 2 +- src/in_memory/row.rs | 23 --- src/lib.rs | 2 +- src/lock/mod.rs | 43 ++--- src/lock/set.rs | 6 +- src/table/mod.rs | 3 +- 13 files changed, 191 insertions(+), 294 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index f3abf84a..870b8396 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -36,26 +36,7 @@ impl Generator { quote! { pub async fn delete(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { - let link = self.0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - - let id = self.0.data.with_ref(link, |archived| { - archived.is_locked() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - let row = self.select(pk.clone()).unwrap(); - self.0.indexes.delete_row(row, link)?; - self.0.pk_map.remove(&pk); - self.0.data.delete(link).map_err(WorkTableError::PagesError)?; - - core::result::Result::Ok(()) + Ok(()) } } } diff --git a/codegen/src/worktable/generator/queries/locks.rs b/codegen/src/worktable/generator/queries/locks.rs index fd0ac853..6d987d62 100644 --- a/codegen/src/worktable/generator/queries/locks.rs +++ b/codegen/src/worktable/generator/queries/locks.rs @@ -8,11 +8,7 @@ impl Generator { pub fn gen_query_locks_impl(&mut self) -> syn::Result { if let Some(q) = &self.queries { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); - let wrapper_name = name_generator.get_wrapper_type_ident(); - let archived_wrapper = Ident::new( - format!("Archived{}", &wrapper_name).as_str(), - Span::mixed_site(), - ); + let lock_type_ident = name_generator.get_lock_type_ident(); let fns = q .updates @@ -23,32 +19,12 @@ impl Generator { .from_case(Case::Pascal) .to_case(Case::Snake); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); - let checks = q - .updates - .get(name) - .expect("exists") - .columns - .iter() - .map(|col| { - let col = - Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if self.#col != 0 { - return Some(self.#col.into()); - } - } - }) - .collect::>(); - let lock_ident = Ident::new( - format!("lock_{snake_case_name}").as_str(), - Span::mixed_site(), - ); - let locks = q + let locks_await = q .updates .get(name) .expect("exists") @@ -58,78 +34,32 @@ impl Generator { let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - self.#col = id.into(); + if let Some(lock) = &self.#col { + futures.push(lock.as_ref()); + } } }) .collect::>(); - let unlock_ident = Ident::new( - format!("unlock_{snake_case_name}").as_str(), - Span::mixed_site(), - ); - let unlocks = q - .updates - .get(name) - .expect("exists") - .columns - .iter() - .map(|col| { - let col = - Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - self.#col = 0u16.into(); - } - }) - .collect::>(); + quote! { - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); - let verify = q - .updates - .get(name) - .expect("exists") - .columns - .iter() - .map(|col| { - let col = - Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if self.#col != id { - return false; - } - } - }) - .collect::>(); + pub async fn #lock_await_ident(&self) { + let mut futures = Vec::new(); - quote! { - pub fn #check_ident(&self) -> Option { - if self.lock != 0 { - return Some(self.lock.into()); + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); } - #(#checks)* - None - } - pub unsafe fn #lock_ident(&mut self, id: u16) { - #(#locks)* - } - - pub unsafe fn #unlock_ident(&mut self) { - #(#unlocks)* - } - pub fn #verify_ident(&self, id: u16) -> bool { - #(#verify)* - true + #(#locks_await)* + futures::future::join_all(futures).await; } } }) .collect::>(); Ok(quote! { - impl #archived_wrapper { + impl #lock_type_ident { #(#fns)* } }) diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 975b3767..9cf95872 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -21,6 +21,8 @@ impl Generator { }; let full_row_update = self.gen_full_row_update(); + println!("{}", full_row_update); + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let table_ident = name_generator.get_work_table_ident(); Ok(quote! { @@ -34,6 +36,7 @@ impl Generator { fn gen_full_row_update(&mut self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_ident = name_generator.get_row_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); let row_updates = self .columns @@ -49,9 +52,14 @@ impl Generator { quote! { pub async fn update(&self, row: #row_ident) -> core::result::Result<(), WorkTableError> { let pk = row.get_primary_key(); - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - self.0.lock_map.insert(op_id.into(), lock.clone()); + + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.lock_await(); // Waiting for all locks released + } + + let lock = std::sync::Arc::new(#lock_ident::new()); //Creates new LockType with None + lock.lock(); // Locks all fields + self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut row = unsafe { rkyv::access_unchecked_mut::<<#row_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -61,27 +69,12 @@ impl Generator { .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; - let id = self.0.data.with_ref(link, |archived| { - archived.is_locked() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - unsafe { self.0.data.with_mut_ref(link, |archived| { - archived.lock = op_id.into(); - }).map_err(WorkTableError::PagesError)? }; unsafe { self.0.data.with_mut_ref(link, move |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.lock = 0u16.into(); - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + + lock.unlock(); // Releases locks + self.0.lock_map.remove(&pk); // Removes locks core::result::Result::Ok(()) } } @@ -120,11 +113,21 @@ impl Generator { if index.is_unique { self.gen_unique_update(snake_case_name, name, index_name, idents) } else { - self.gen_non_unique_update(snake_case_name, name, index_name, idents) + let t = + self.gen_non_unique_update(snake_case_name, name, index_name, idents); + println!("!non-unique {}", t); + t } } else if self.columns.primary_keys.len() == 1 { if *self.columns.primary_keys.first().unwrap() == op.by { - self.gen_pk_update(snake_case_name, name, idents, indexes_columns.as_ref()) + let t = self.gen_pk_update( + snake_case_name, + name, + idents, + indexes_columns.as_ref(), + ); + println!("!gen_pk {}", t); + t } else { todo!() } diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index b2b98d2c..d07eba56 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -9,6 +9,8 @@ impl Generator { let def = self.gen_row_type(); let table_row_impl = self.gen_row_table_row_impl(); + println!("!def {}", def); + quote! { #def #table_row_impl diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index 1c5f9a56..552a18f7 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -7,16 +7,13 @@ impl Generator { pub fn gen_wrapper_def(&self) -> TokenStream { let type_ = self.gen_wrapper_type(); let impl_ = self.gen_wrapper_impl(); - let archived_impl = self.get_wrapper_archived_impl(); let storable_impl = self.get_wrapper_storable_impl(); println!("!TYPE {}", type_); - println!("!Archived {}", archived_impl); quote! { #type_ #impl_ - #archived_impl #storable_impl } } @@ -34,137 +31,132 @@ impl Generator { .map(|i| { let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); quote! { - #name: u16, + #name: Option>, } }) .collect::>(); - let row_locks2 = self + let row_new = self .columns .columns_map .keys() .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); quote! { - #name: Option>, + #col: None } }) .collect::>(); - quote! { - #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] - #[repr(C)] - pub struct #wrapper_ident { - inner: #row_ident, - is_deleted: bool, - lock: u16, - #(#row_locks)* - } - #[derive(Debug)] - pub struct #lock_ident { - lock: Option>, - #(#row_locks2)* - } - } - } - - pub fn gen_wrapper_impl(&self) -> TokenStream { - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); - let wrapper_ident = name_generator.get_wrapper_type_ident(); - let row_ident = name_generator.get_row_type_ident(); - let row_defaults = self + let lock_await = self .columns .columns_map .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - #name: Default::default(), + if let Some(lock) = &self.#col { + futures.push(lock.as_ref()); + } } }) .collect::>(); - quote! { - - impl RowWrapper<#row_ident> for #wrapper_ident { - fn get_inner(self) -> #row_ident { - self.inner - } - - fn from_inner(inner: #row_ident) -> Self { - Self { - inner, - is_deleted: Default::default(), - lock: Default::default(), - #(#row_defaults)* - } - } - } - } - } - - fn get_wrapper_archived_impl(&self) -> TokenStream { - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); - let wrapper_ident = name_generator.get_wrapper_type_ident(); - let lock_ident = name_generator.get_lock_type_ident(); - - let archived_wrapper_ident = Ident::new( - format!("Archived{}", &wrapper_ident).as_str(), - Span::mixed_site(), - ); - let checks = self + let row_unlock = self .columns .columns_map .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - if self.#name != 0 { - return Some(self.#name.into()); - } + if let Some(#col) = &self.#col { + #col.unlock(); + } } }) .collect::>(); - let checks2 = self + let row_lock = self .columns .columns_map .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - if let Some(#name) = &self.#name { - if #name.locked.load(std::sync::atomic::Ordering::Acquire) { - return true; - } - } + if let Some(#col) = &self.#col { + #col.lock(); + } } }) .collect::>(); quote! { + #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] + #[repr(C)] + pub struct #wrapper_ident { + inner: #row_ident, + is_deleted: bool, + } + + #[derive(Debug)] + pub struct #lock_ident { + lock: Option>, + #(#row_locks)* + } + + impl #lock_ident { + pub fn new() -> Self { + Self { + lock: None, + #(#row_new),* + } + } + + pub fn lock(&self) { + if let Some(lock) = &self.lock { + lock.lock(); + } + #(#row_lock)* + } + - impl Lockable for #lock_ident { - fn is_locked(&self) -> bool { + pub fn unlock(&self) { if let Some(lock) = &self.lock { - if lock.locked.load(std::sync::atomic::Ordering::Acquire) { - return true; - } + lock.unlock(); } + #(#row_unlock)* + } - #(#checks2)* + pub async fn lock_await(&self) { + let mut futures = Vec::new(); - false + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); + } + #(#lock_await)* + futures::future::join_all(futures).await; } } + } + } + + pub fn gen_wrapper_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let wrapper_ident = name_generator.get_wrapper_type_ident(); + let row_ident = name_generator.get_row_type_ident(); - impl ArchivedRow for #archived_wrapper_ident { - fn is_locked(&self) -> Option { - if self.lock != 0 { - return Some(self.lock.into()); + quote! { + + impl RowWrapper<#row_ident> for #wrapper_ident { + fn get_inner(self) -> #row_ident { + self.inner + } + + fn from_inner(inner: #row_ident) -> Self { + Self { + inner, + is_deleted: Default::default(), } - #(#checks)* - None } } } diff --git a/codegen/src/worktable/mod.rs b/codegen/src/worktable/mod.rs index e42fb60c..137e6e1e 100644 --- a/codegen/src/worktable/mod.rs +++ b/codegen/src/worktable/mod.rs @@ -59,7 +59,8 @@ pub fn expand(input: TokenStream) -> syn::Result { let update_impls = generator.gen_query_update_impl()?; let delete_impls = generator.gen_query_delete_impl()?; - println!("{}", table_def); + //println!("{}", table_def); + println!("{}", query_locks_impls); Ok(quote! { #pk_def diff --git a/examples/src/main.rs b/examples/src/main.rs index 45854cad..cabe1d8d 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -20,14 +20,11 @@ fn main() { }, queries: { update: { - ValById(val) by id, - AllAttrById(attr, attr2) by id, - UpdateOptionalById(test) by id, + // ValById(val) by id, + // AllAttrById(attr, attr2) by id, + // UpdateTestByIdx(test) by attr, }, - delete: { - ByAttr() by attr, - ById() by id, - } + } ); @@ -43,31 +40,43 @@ fn main() { id: 0, }; + let row2 = MyRow { + val: 444, + attr: "Attri".to_string(), + attr2: 3456, + test: 22, + id: 0, + }; + // insert let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); - // Select ALL records from WT - let select_all = my_table.select_all().execute(); - println!("Select All {:?}", select_all); + let upd = my_table.update(row2); + let _ = block_on(upd); - // Select All records with attribute TEST + // + //// Select ALL records from WT let select_all = my_table.select_all().execute(); println!("Select All {:?}", select_all); - - // Select by Idx - let select_by_attr = my_table.select_by_attr("Attribute1".to_string()); - println!("Select by idx {:?}", select_by_attr.unwrap().vals); - - // Update Value query - let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); - let _ = block_on(update); - - let select_all = my_table.select_all().execute(); - println!("Select after update val {:?}", select_all); - - let delete = my_table.delete(pk); - let _ = block_on(delete); - - let select_all = my_table.select_all().execute(); - println!("Select after delete {:?}", select_all); + // + //// Select All records with attribute TEST + //let select_all = my_table.select_all().execute(); + //println!("Select All {:?}", select_all); + // + //// Select by Idx + //let select_by_attr = my_table.select_by_attr("Attribute1".to_string()); + //println!("Select by idx {:?}", select_by_attr.unwrap().vals); + // + //// Update Value query + //let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); + //let _ = block_on(update); + // + //let select_all = my_table.select_all().execute(); + //println!("Select after update val {:?}", select_all); + // + //let delete = my_table.delete(pk); + //let _ = block_on(delete); + // + //let select_all = my_table.select_all().execute(); + //println!("Select after delete {:?}", select_all); } diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index 2a49b519..126cdeb7 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -4,4 +4,4 @@ mod row; pub use data::{Data, ExecutionError as DataExecutionError, DATA_INNER_LENGTH}; pub use pages::{DataPages, ExecutionError as PagesExecutionError}; -pub use row::{ArchivedRow, Lockable, RowWrapper, StorableRow}; +pub use row::{RowWrapper, StorableRow}; diff --git a/src/in_memory/row.rs b/src/in_memory/row.rs index 154c03aa..7586dec0 100644 --- a/src/in_memory/row.rs +++ b/src/in_memory/row.rs @@ -17,20 +17,6 @@ pub trait RowWrapper { fn from_inner(inner: Inner) -> Self; } -pub trait ArchivedRow { - fn is_locked(&self) -> Option; -} - -pub trait Lockable { - fn is_locked(&self) -> bool; -} - -impl Lockable for () { - fn is_locked(&self) -> bool { - false - } -} - /// General `Row` wrapper that is used to append general data for every `Inner` /// `Row`. #[derive(Archive, Deserialize, Debug, Serialize)] @@ -56,12 +42,3 @@ impl RowWrapper for GeneralRow { } } } - -impl ArchivedRow for ArchivedGeneralRow -where - Inner: Archive, -{ - fn is_locked(&self) -> Option { - None - } -} diff --git a/src/lib.rs b/src/lib.rs index 3e4cd634..6696619d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub use worktable_codegen::worktable; pub mod prelude { pub use crate::database::DatabaseManager; - pub use crate::in_memory::{ArchivedRow, Data, DataPages, Lockable, RowWrapper, StorableRow}; + pub use crate::in_memory::{Data, DataPages, RowWrapper, StorableRow}; pub use crate::lock::LockMap; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{ diff --git a/src/lock/mod.rs b/src/lock/mod.rs index ad0ef0ce..2b0f53e0 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -7,21 +7,38 @@ use std::task::{Context, Poll}; use derive_more::From; use futures::task::AtomicWaker; -use rkyv::{Archive, Deserialize, Serialize}; pub use set::LockMap; -#[derive( - Archive, Clone, Copy, Deserialize, Debug, Eq, From, Hash, Ord, Serialize, PartialEq, PartialOrd, -)] -pub struct LockId(u16); - #[derive(Debug)] pub struct Lock { pub locked: AtomicBool, waker: AtomicWaker, } +impl Lock { + pub fn new() -> Self { + Self { + locked: AtomicBool::from(true), + waker: AtomicWaker::new(), + } + } + + pub fn unlock(&self) { + self.locked.store(false, Ordering::Relaxed); + self.waker.wake() + } + + pub fn lock(&self) { + self.locked.store(true, Ordering::Relaxed); + self.waker.wake() + } + + pub fn is_locked(&self) -> bool { + self.locked.load(Ordering::Acquire) + } +} + impl Future for &Lock { type Output = (); @@ -40,17 +57,3 @@ impl Default for Lock { Self::new() } } - -impl Lock { - pub fn new() -> Self { - Self { - locked: AtomicBool::from(true), - waker: AtomicWaker::new(), - } - } - - pub fn unlock(&self) { - self.locked.store(false, Ordering::Relaxed); - self.waker.wake() - } -} diff --git a/src/lock/set.rs b/src/lock/set.rs index fefd0eba..e2af9e05 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -7,7 +7,7 @@ pub struct LockMap where PrimaryKey: std::hash::Hash + std::cmp::Ord, { - set: Map>, + set: Map>>, } impl Default for LockMap @@ -28,11 +28,11 @@ where } pub fn insert(&self, id: PrimaryKey, lock: Arc) { - self.set.insert(id, lock); + self.set.insert(id, Some(lock)); } pub fn get(&self, id: &PrimaryKey) -> Option> { - self.set.get(id).map(|v| v.val().clone()) + self.set.get(id).map(|v| v.val().clone())? } pub fn remove(&self, id: &PrimaryKey) { diff --git a/src/table/mod.rs b/src/table/mod.rs index 008d2c14..a65f141d 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -14,7 +14,7 @@ use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; -use crate::in_memory::{DataPages, Lockable, RowWrapper, StorableRow}; +use crate::in_memory::{DataPages, RowWrapper, StorableRow}; use crate::lock::LockMap; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; use crate::{in_memory, IndexMap, TableRow, TableSecondaryIndex}; @@ -63,7 +63,6 @@ impl< where PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, SecondaryIndexes: Default, - LockType: Lockable, PkGen: Default, Row: StorableRow, ::WrappedRow: RowWrapper, From 779a7deb011b48b1b71b4a7e583742cbae2a1f88 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Sun, 23 Feb 2025 11:08:17 +0300 Subject: [PATCH 04/11] Wip add delete --- .../src/worktable/generator/queries/delete.rs | 37 ++++++++++++++++++- .../src/worktable/generator/queries/update.rs | 3 +- codegen/src/worktable/generator/wrapper.rs | 22 ++++++++++- examples/src/main.rs | 18 +++++++-- 4 files changed, 72 insertions(+), 8 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 870b8396..bf79e786 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -34,9 +34,44 @@ impl Generator { fn gen_full_row_delete(&mut self) -> TokenStream { let pk_ident = &self.pk.as_ref().unwrap().ident; + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + quote! { pub async fn delete(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { - Ok(()) + + if let Some(lock) = self.0.lock_map.get(&pk) { + println!("Lock await {:?}", lock ); + lock.lock_await(); // Waiting for all locks released + } + + let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with None + println!("Lock.lock {:?}", lock ); + + self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap + + + let link = self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + + + let row = self.select(pk.clone()).unwrap(); + self.0.indexes.delete_row(row, link)?; + self.0.pk_map.remove(&pk); + self.0.data.delete(link).map_err(WorkTableError::PagesError)?; + + lock.unlock(); // Releases locks + println!("Lock unlock {:?}", lock ); + + self.0.lock_map.remove(&pk); // Removes locks + + println!("Lock remove {:?}", self.0.lock_map ); + + + core::result::Result::Ok(()) } } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 9cf95872..fd437e18 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -57,8 +57,7 @@ impl Generator { lock.lock_await(); // Waiting for all locks released } - let lock = std::sync::Arc::new(#lock_ident::new()); //Creates new LockType with None - lock.lock(); // Locks all fields + let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with None self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index 552a18f7..5e47a0fe 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -48,6 +48,18 @@ impl Generator { }) .collect::>(); + let row_with_lock = self + .columns + .columns_map + .keys() + .map(|i| { + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + #col: Some(std::sync::Arc::new(Lock::new())) + } + }) + .collect::>(); + let lock_await = self .columns .columns_map @@ -112,13 +124,21 @@ impl Generator { } } + pub fn with_lock() -> Self { + Self { + lock: Some(std::sync::Arc::new(Lock::new())), + #(#row_with_lock),* + + } + } + pub fn lock(&self) { if let Some(lock) = &self.lock { lock.lock(); } #(#row_lock)* - } + } pub fn unlock(&self) { if let Some(lock) = &self.lock { diff --git a/examples/src/main.rs b/examples/src/main.rs index cabe1d8d..046708d7 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -48,9 +48,19 @@ fn main() { id: 0, }; + let row3 = MyRow { + val: 7777, + attr: "Attribute1".to_string(), + attr2: 345, + test: 1, + id: 1, + }; + // insert let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); + let pk2: MyPrimaryKey = my_table.insert(row3).expect("primary key"); + let upd = my_table.update(row2); let _ = block_on(upd); @@ -74,9 +84,9 @@ fn main() { //let select_all = my_table.select_all().execute(); //println!("Select after update val {:?}", select_all); // - //let delete = my_table.delete(pk); - //let _ = block_on(delete); + let delete = my_table.delete(pk); + let _ = block_on(delete); // - //let select_all = my_table.select_all().execute(); - //println!("Select after delete {:?}", select_all); + let select_all = my_table.select_all().execute(); + println!("Select after delete {:?}", select_all); } From c04877e9c472cbbbffe63e3c4f0739f497f940fb Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Sun, 23 Feb 2025 13:03:27 +0300 Subject: [PATCH 05/11] WIP --- codegen/src/worktable/generator/locks.rs | 160 ++++++++++++++ codegen/src/worktable/generator/mod.rs | 1 + .../src/worktable/generator/queries/delete.rs | 10 +- .../src/worktable/generator/queries/locks.rs | 62 +++++- .../src/worktable/generator/queries/update.rs | 198 ++++++++---------- codegen/src/worktable/generator/wrapper.rs | 131 +----------- codegen/src/worktable/mod.rs | 6 +- examples/src/main.rs | 27 ++- tests/worktable/custom_pk.rs | 1 + 9 files changed, 332 insertions(+), 264 deletions(-) create mode 100644 codegen/src/worktable/generator/locks.rs diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs new file mode 100644 index 00000000..2ad05631 --- /dev/null +++ b/codegen/src/worktable/generator/locks.rs @@ -0,0 +1,160 @@ +use crate::name_generator::WorktableNameGenerator; +use crate::worktable::generator::Generator; +use proc_macro2::{Ident, Span, TokenStream}; +use quote::quote; + +impl Generator { + pub fn gen_locks_def(&self) -> TokenStream { + let type_ = self.gen_locks_type(); + let impl_ = self.gen_locks_impl(); + + println!("!TYPE {}", type_); + println!("!IMPL {}", impl_); + + quote! { + #type_ + #impl_ + } + } + + pub fn gen_locks_type(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + + let row_locks = self + .columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + #name: Option>, + } + }) + .collect::>(); + + quote! { + #[derive(Debug, Clone)] + pub struct #lock_ident { + lock: Option>, + #(#row_locks)* + } + } + } + + fn gen_locks_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + + let row_new = self + .columns + .columns_map + .keys() + .map(|i| { + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + #col: None + } + }) + .collect::>(); + + let row_with_lock = self + .columns + .columns_map + .keys() + .map(|i| { + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { + #col: Some(std::sync::Arc::new(Lock::new())) + } + }) + .collect::>(); + + let row_lock_await = self + .columns + .columns_map + .keys() + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(lock) = &self.#col { + futures.push(lock.as_ref()); + } + } + }) + .collect::>(); + + let row_unlock = self + .columns + .columns_map + .keys() + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(#col) = &self.#col { + #col.unlock(); + } + } + }) + .collect::>(); + + let row_lock = self + .columns + .columns_map + .keys() + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(#col) = &self.#col { + #col.lock(); + } + } + }) + .collect::>(); + + quote! { + + impl #lock_ident { + pub fn new() -> Self { + Self { + lock: None, + #(#row_new),* + } + } + + pub fn with_lock() -> Self { + Self { + lock: Some(std::sync::Arc::new(Lock::new())), + #(#row_with_lock),* + + } + } + + pub fn lock(&self) { + if let Some(lock) = &self.lock { + lock.lock(); + } + #(#row_lock)* + + } + + pub fn unlock(&self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + #(#row_unlock)* + } + + pub async fn lock_await(&self) { + let mut futures = Vec::new(); + + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); + } + #(#row_lock_await)* + futures::future::join_all(futures).await; + } + } + } + } +} diff --git a/codegen/src/worktable/generator/mod.rs b/codegen/src/worktable/generator/mod.rs index a07f4d0e..bcdb9fd8 100644 --- a/codegen/src/worktable/generator/mod.rs +++ b/codegen/src/worktable/generator/mod.rs @@ -1,4 +1,5 @@ mod index; +mod locks; mod primary_key; mod queries; mod row; diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index bf79e786..2fd589ac 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -41,13 +41,10 @@ impl Generator { pub async fn delete(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { if let Some(lock) = self.0.lock_map.get(&pk) { - println!("Lock await {:?}", lock ); - lock.lock_await(); // Waiting for all locks released + lock.lock_await().await; // Waiting for all locks released } let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with None - println!("Lock.lock {:?}", lock ); - self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap @@ -64,13 +61,8 @@ impl Generator { self.0.data.delete(link).map_err(WorkTableError::PagesError)?; lock.unlock(); // Releases locks - println!("Lock unlock {:?}", lock ); - self.0.lock_map.remove(&pk); // Removes locks - println!("Lock remove {:?}", self.0.lock_map ); - - core::result::Result::Ok(()) } } diff --git a/codegen/src/worktable/generator/queries/locks.rs b/codegen/src/worktable/generator/queries/locks.rs index 6d987d62..95eab459 100644 --- a/codegen/src/worktable/generator/queries/locks.rs +++ b/codegen/src/worktable/generator/queries/locks.rs @@ -24,7 +24,17 @@ impl Generator { Span::mixed_site(), ); - let locks_await = q + let lock_ident = Ident::new( + format!("lock_{snake_case_name}").as_str(), + Span::mixed_site(), + ); + + let unlock_ident = Ident::new( + format!("unlock_{snake_case_name}").as_str(), + Span::mixed_site(), + ); + + let rows_lock_await = q .updates .get(name) .expect("exists") @@ -41,8 +51,56 @@ impl Generator { }) .collect::>(); + let rows_lock = q + .updates + .get(name) + .expect("exists") + .columns + .iter() + .map(|col| { + let col = + Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if self.#col.is_none() { + self.#col = Some(std::sync::Arc::new(Lock::new())); + } + } + }) + .collect::>(); + + let rows_unlock = q + .updates + .get(name) + .expect("exists") + .columns + .iter() + .map(|col| { + let col = + Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(#col) = &self.#col { + #col.unlock(); + } + } + }) + .collect::>(); + quote! { + pub fn #lock_ident(&mut self) { + if self.lock.is_none() { + self.lock = Some(std::sync::Arc::new(Lock::new())); + } + #(#rows_lock)* + } + + pub fn #unlock_ident(&self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + #(#rows_unlock)* + } + pub async fn #lock_await_ident(&self) { let mut futures = Vec::new(); @@ -51,7 +109,7 @@ impl Generator { } - #(#locks_await)* + #(#rows_lock_await)* futures::future::join_all(futures).await; } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index fd437e18..315e606b 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -54,10 +54,10 @@ impl Generator { let pk = row.get_primary_key(); if let Some(lock) = self.0.lock_map.get(&pk) { - lock.lock_await(); // Waiting for all locks released + lock.lock_await().await; // Waiting for all locks released } - let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with None + let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with Locks self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; @@ -110,7 +110,9 @@ impl Generator { let index_name = &index.name; if index.is_unique { - self.gen_unique_update(snake_case_name, name, index_name, idents) + let t = self.gen_unique_update(snake_case_name, name, index_name, idents); + println!("!unique {}", t); + t } else { let t = self.gen_non_unique_update(snake_case_name, name, index_name, idents); @@ -156,25 +158,21 @@ impl Generator { let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); - let lock_ident = Ident::new( - format!("lock_{snake_case_name}").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let unlock_ident = Ident::new( format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), + let lock_ident = Ident::new( + format!("lock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let avt_type_ident = name_generator.get_available_type_ident(); + let lock_type_ident = name_generator.get_lock_type_ident(); let row_updates = idents .iter() @@ -227,10 +225,16 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #pk_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - self.0.lock_map.insert(op_id.into(), lock.clone()); + if let Some(lock) = self.0.lock_map.get(&by) { + lock.#lock_await_ident().await; // Waiting for all locks released + } + let mut lock = #lock_type_ident::new(); //Creates new LockType with None + lock.#lock_ident(); + + println!("Lock {:?}", lock); + + self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); #diff_container_ident @@ -244,33 +248,13 @@ impl Generator { #process_diff_ident - let id = self.0.data.with_ref(link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - unsafe { self.0.data.with_mut_ref(link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + lock.#unlock_ident(); + println!("Unlock {:?}", lock); + self.0.lock_map.remove(&by); core::result::Result::Ok(()) } @@ -292,8 +276,8 @@ impl Generator { let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); let by_ident = Ident::new(format!("{name}By").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let lock_ident = Ident::new( @@ -304,10 +288,10 @@ impl Generator { format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); + + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_type_ident = name_generator.get_lock_type_ident(); + let row_updates = idents .iter() .map(|i| { @@ -319,46 +303,34 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - - self.0.lock_map.insert(op_id.into(), lock.clone()); - for (_, link) in self.0.indexes.#index.get(&by) { - let id = self.0.data.with_ref(*link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - unsafe { self.0.data.with_mut_ref(*link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; + if let Some(lock) = self.0.lock_map.get(&by) { + lock.#lock_await_ident().await; } - for (_, link) in self.0.indexes.#index.get(&by) { - let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; - let mut row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; - unsafe { self.0.data.with_mut_ref(*link, |archived| { - #(#row_updates)* - }).map_err(WorkTableError::PagesError)? }; - } + let mut lock = #lock_type_ident::new(); + lock.#lock_ident(); + + self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); + + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + + let mut row = unsafe { + rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; for (_, link) in self.0.indexes.#index.get(&by) { - unsafe { self.0.data.with_mut_ref(*link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; + unsafe { + self.0.data.with_mut_ref(*link, |archived| { + #(#row_updates)* + }).map_err(WorkTableError::PagesError)?; + } } - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + + lock.#unlock_ident(); + self.0.lock_map.remove(&by); core::result::Result::Ok(()) } @@ -380,8 +352,8 @@ impl Generator { let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); let by_ident = Ident::new(format!("{name}By").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let lock_ident = Ident::new( @@ -392,10 +364,10 @@ impl Generator { format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); + + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_type_ident = name_generator.get_lock_type_ident(); + let row_updates = idents .iter() .map(|i| { @@ -407,41 +379,37 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - - self.0.lock_map.insert(op_id.into(), lock.clone()); - let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; - let mut row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; - let link = self.0.indexes.#index.get(&by).map(|kv| kv.get().value).ok_or(WorkTableError::NotFound)?; - let id = self.0.data.with_ref(link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } + if let Some(lock) = self.0.lock_map.get(&by) { + lock.#lock_await_ident().await; } - unsafe { self.0.data.with_mut_ref(link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { - #(#row_updates)* - }).map_err(WorkTableError::PagesError)? }; + let mut lock = #lock_type_ident::new(); + lock.#lock_ident(); - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); + + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + + let mut row = unsafe { + rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; + + let link = self.0.indexes.#index + .get(&by) + .map(|kv| kv.get().value) + .ok_or(WorkTableError::NotFound)?; + + unsafe { + self.0.data.with_mut_ref(link, |archived| { + #(#row_updates)* + }).map_err(WorkTableError::PagesError)?; + } + + lock.#unlock_ident(); + self.0.lock_map.remove(&by); core::result::Result::Ok(()) } diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index 5e47a0fe..c50df988 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -1,6 +1,6 @@ use crate::name_generator::WorktableNameGenerator; use crate::worktable::generator::Generator; -use proc_macro2::{Ident, Span, TokenStream}; +use proc_macro2::TokenStream; use quote::quote; impl Generator { @@ -9,8 +9,6 @@ impl Generator { let impl_ = self.gen_wrapper_impl(); let storable_impl = self.get_wrapper_storable_impl(); - println!("!TYPE {}", type_); - quote! { #type_ #impl_ @@ -22,85 +20,6 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_ident = name_generator.get_row_type_ident(); let wrapper_ident = name_generator.get_wrapper_type_ident(); - let lock_ident = name_generator.get_lock_type_ident(); - - let row_locks = self - .columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #name: Option>, - } - }) - .collect::>(); - - let row_new = self - .columns - .columns_map - .keys() - .map(|i| { - let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #col: None - } - }) - .collect::>(); - - let row_with_lock = self - .columns - .columns_map - .keys() - .map(|i| { - let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #col: Some(std::sync::Arc::new(Lock::new())) - } - }) - .collect::>(); - - let lock_await = self - .columns - .columns_map - .keys() - .map(|col| { - let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if let Some(lock) = &self.#col { - futures.push(lock.as_ref()); - } - } - }) - .collect::>(); - - let row_unlock = self - .columns - .columns_map - .keys() - .map(|col| { - let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if let Some(#col) = &self.#col { - #col.unlock(); - } - } - }) - .collect::>(); - - let row_lock = self - .columns - .columns_map - .keys() - .map(|col| { - let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if let Some(#col) = &self.#col { - #col.lock(); - } - } - }) - .collect::>(); quote! { #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] @@ -109,54 +28,6 @@ impl Generator { inner: #row_ident, is_deleted: bool, } - - #[derive(Debug)] - pub struct #lock_ident { - lock: Option>, - #(#row_locks)* - } - - impl #lock_ident { - pub fn new() -> Self { - Self { - lock: None, - #(#row_new),* - } - } - - pub fn with_lock() -> Self { - Self { - lock: Some(std::sync::Arc::new(Lock::new())), - #(#row_with_lock),* - - } - } - - pub fn lock(&self) { - if let Some(lock) = &self.lock { - lock.lock(); - } - #(#row_lock)* - - } - - pub fn unlock(&self) { - if let Some(lock) = &self.lock { - lock.unlock(); - } - #(#row_unlock)* - } - - pub async fn lock_await(&self) { - let mut futures = Vec::new(); - - if let Some(lock) = &self.lock { - futures.push(lock.as_ref()); - } - #(#lock_await)* - futures::future::join_all(futures).await; - } - } } } diff --git a/codegen/src/worktable/mod.rs b/codegen/src/worktable/mod.rs index 137e6e1e..34efab46 100644 --- a/codegen/src/worktable/mod.rs +++ b/codegen/src/worktable/mod.rs @@ -50,6 +50,7 @@ pub fn expand(input: TokenStream) -> syn::Result { let pk_def = generator.gen_primary_key_def()?; let row_def = generator.gen_row_def(); let wrapper_def = generator.gen_wrapper_def(); + let locks_def = generator.gen_locks_def(); let index_def = generator.gen_index_def(); let table_def = generator.gen_table_def()?; let query_types_def = generator.gen_result_types_def()?; @@ -59,14 +60,15 @@ pub fn expand(input: TokenStream) -> syn::Result { let update_impls = generator.gen_query_update_impl()?; let delete_impls = generator.gen_query_delete_impl()?; - //println!("{}", table_def); - println!("{}", query_locks_impls); + println!("Q: {}", query_locks_impls); + //println!("LocksDef {}", locks_def); Ok(quote! { #pk_def #row_def #query_available_def #wrapper_def + #locks_def #index_def #table_def #query_types_def diff --git a/examples/src/main.rs b/examples/src/main.rs index 046708d7..e6c1753f 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -16,12 +16,12 @@ fn main() { }, indexes: { idx1: attr, - idx2: attr2, + idx2: attr2 unique, }, queries: { update: { - // ValById(val) by id, - // AllAttrById(attr, attr2) by id, + AttrById(attr) by id, + AllById(attr, attr2) by id, // UpdateTestByIdx(test) by attr, }, @@ -51,7 +51,7 @@ fn main() { let row3 = MyRow { val: 7777, attr: "Attribute1".to_string(), - attr2: 345, + attr2: 3452, test: 1, id: 1, }; @@ -78,8 +78,23 @@ fn main() { //println!("Select by idx {:?}", select_by_attr.unwrap().vals); // //// Update Value query - //let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); - //let _ = block_on(update); + let update = my_table.update_attr_by_id( + AttrByIdQuery { + attr: "1337".to_string(), + }, + pk2.clone(), + ); + let _ = block_on(update); + + let update2 = my_table.update_all_by_id( + AllByIdQuery { + attr: "TESTTEST".to_string(), + attr2: 13375, + }, + pk2, + ); + + let _ = block_on(update2); // //let select_all = my_table.select_all().execute(); //println!("Select after update val {:?}", select_all); diff --git a/tests/worktable/custom_pk.rs b/tests/worktable/custom_pk.rs index 79ab5669..ade62fe6 100644 --- a/tests/worktable/custom_pk.rs +++ b/tests/worktable/custom_pk.rs @@ -17,6 +17,7 @@ use worktable::worktable; Ord, Serialize, SizeMeasure, + Hash, )] #[rkyv(compare(PartialEq), derive(Debug))] struct CustomId(u64); From bf957c4c4425638f30d2979b8e47099b58a4abe8 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Sun, 23 Feb 2025 18:19:33 +0300 Subject: [PATCH 06/11] Add non_unique and unique LockMap logic --- codegen/src/worktable/generator/locks.rs | 3 - .../src/worktable/generator/queries/update.rs | 79 ++++++++----------- codegen/src/worktable/generator/row.rs | 2 - codegen/src/worktable/mod.rs | 3 - examples/src/main.rs | 78 ++++++------------ src/lock/set.rs | 20 ++--- 6 files changed, 67 insertions(+), 118 deletions(-) diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs index 2ad05631..7011f737 100644 --- a/codegen/src/worktable/generator/locks.rs +++ b/codegen/src/worktable/generator/locks.rs @@ -8,9 +8,6 @@ impl Generator { let type_ = self.gen_locks_type(); let impl_ = self.gen_locks_impl(); - println!("!TYPE {}", type_); - println!("!IMPL {}", impl_); - quote! { #type_ #impl_ diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 315e606b..1b483092 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -20,11 +20,9 @@ impl Generator { quote! {} }; let full_row_update = self.gen_full_row_update(); - - println!("{}", full_row_update); - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let table_ident = name_generator.get_work_table_ident(); + Ok(quote! { impl #table_ident { #full_row_update @@ -110,25 +108,13 @@ impl Generator { let index_name = &index.name; if index.is_unique { - let t = self.gen_unique_update(snake_case_name, name, index_name, idents); - println!("!unique {}", t); - t + self.gen_unique_update(snake_case_name, name, index_name, idents) } else { - let t = - self.gen_non_unique_update(snake_case_name, name, index_name, idents); - println!("!non-unique {}", t); - t + self.gen_non_unique_update(snake_case_name, name, index_name, idents) } } else if self.columns.primary_keys.len() == 1 { if *self.columns.primary_keys.first().unwrap() == op.by { - let t = self.gen_pk_update( - snake_case_name, - name, - idents, - indexes_columns.as_ref(), - ); - println!("!gen_pk {}", t); - t + self.gen_pk_update(snake_case_name, name, idents, indexes_columns.as_ref()) } else { todo!() } @@ -232,8 +218,6 @@ impl Generator { let mut lock = #lock_type_ident::new(); //Creates new LockType with None lock.#lock_ident(); - println!("Lock {:?}", lock); - self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); #diff_container_ident @@ -253,7 +237,6 @@ impl Generator { }).map_err(WorkTableError::PagesError)? }; lock.#unlock_ident(); - println!("Unlock {:?}", lock); self.0.lock_map.remove(&by); core::result::Result::Ok(()) @@ -304,33 +287,41 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - if let Some(lock) = self.0.lock_map.get(&by) { - lock.#lock_await_ident().await; + for (_, link) in self.0.indexes.#index.get(&by) { + let pk = self.0.data.select(*link)?.get_primary_key(); + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#lock_await_ident().await; + } } - let mut lock = #lock_type_ident::new(); - lock.#lock_ident(); - - self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); + for (_, link) in self.0.indexes.#index.get(&by) { + let pk = self.0.data.select(*link)?.get_primary_key(); + let mut lock = #lock_type_ident::new(); + lock.#lock_ident(); + self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); + } - let mut bytes = rkyv::to_bytes::(&row) + for (_, link) in self.0.indexes.#index.get(&by) { + let mut bytes = rkyv::to_bytes::(&row) .map_err(|_| WorkTableError::SerializeError)?; - let mut row = unsafe { + let mut row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]) .unseal_unchecked() }; - - for (_, link) in self.0.indexes.#index.get(&by) { unsafe { self.0.data.with_mut_ref(*link, |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)?; } } - - lock.#unlock_ident(); - self.0.lock_map.remove(&by); + for (_, link) in self.0.indexes.#index.get(&by) { + let pk = self.0.data.select(*link)?.get_primary_key(); + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#unlock_ident(); + self.0.lock_map.remove(&pk); + } + } core::result::Result::Ok(()) } @@ -380,15 +371,6 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - if let Some(lock) = self.0.lock_map.get(&by) { - lock.#lock_await_ident().await; - } - - let mut lock = #lock_type_ident::new(); - lock.#lock_ident(); - - self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); - let mut bytes = rkyv::to_bytes::(&row) .map_err(|_| WorkTableError::SerializeError)?; @@ -402,6 +384,15 @@ impl Generator { .map(|kv| kv.get().value) .ok_or(WorkTableError::NotFound)?; + let pk = self.0.data.select(link)?.get_primary_key(); + + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#lock_await_ident().await; + } + let mut lock = #lock_type_ident::new(); + lock.#lock_ident(); + self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); + unsafe { self.0.data.with_mut_ref(link, |archived| { #(#row_updates)* @@ -409,7 +400,7 @@ impl Generator { } lock.#unlock_ident(); - self.0.lock_map.remove(&by); + self.0.lock_map.remove(&pk); core::result::Result::Ok(()) } diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index d07eba56..b2b98d2c 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -9,8 +9,6 @@ impl Generator { let def = self.gen_row_type(); let table_row_impl = self.gen_row_table_row_impl(); - println!("!def {}", def); - quote! { #def #table_row_impl diff --git a/codegen/src/worktable/mod.rs b/codegen/src/worktable/mod.rs index 34efab46..c38bc846 100644 --- a/codegen/src/worktable/mod.rs +++ b/codegen/src/worktable/mod.rs @@ -60,9 +60,6 @@ pub fn expand(input: TokenStream) -> syn::Result { let update_impls = generator.gen_query_update_impl()?; let delete_impls = generator.gen_query_delete_impl()?; - println!("Q: {}", query_locks_impls); - //println!("LocksDef {}", locks_def); - Ok(quote! { #pk_def #row_def diff --git a/examples/src/main.rs b/examples/src/main.rs index e6c1753f..45854cad 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -16,15 +16,18 @@ fn main() { }, indexes: { idx1: attr, - idx2: attr2 unique, + idx2: attr2, }, queries: { update: { - AttrById(attr) by id, - AllById(attr, attr2) by id, - // UpdateTestByIdx(test) by attr, + ValById(val) by id, + AllAttrById(attr, attr2) by id, + UpdateOptionalById(test) by id, }, - + delete: { + ByAttr() by attr, + ById() by id, + } } ); @@ -40,68 +43,31 @@ fn main() { id: 0, }; - let row2 = MyRow { - val: 444, - attr: "Attri".to_string(), - attr2: 3456, - test: 22, - id: 0, - }; - - let row3 = MyRow { - val: 7777, - attr: "Attribute1".to_string(), - attr2: 3452, - test: 1, - id: 1, - }; - // insert let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); - let pk2: MyPrimaryKey = my_table.insert(row3).expect("primary key"); - - let upd = my_table.update(row2); - let _ = block_on(upd); + // Select ALL records from WT + let select_all = my_table.select_all().execute(); + println!("Select All {:?}", select_all); - // - //// Select ALL records from WT + // Select All records with attribute TEST let select_all = my_table.select_all().execute(); println!("Select All {:?}", select_all); - // - //// Select All records with attribute TEST - //let select_all = my_table.select_all().execute(); - //println!("Select All {:?}", select_all); - // - //// Select by Idx - //let select_by_attr = my_table.select_by_attr("Attribute1".to_string()); - //println!("Select by idx {:?}", select_by_attr.unwrap().vals); - // - //// Update Value query - let update = my_table.update_attr_by_id( - AttrByIdQuery { - attr: "1337".to_string(), - }, - pk2.clone(), - ); + + // Select by Idx + let select_by_attr = my_table.select_by_attr("Attribute1".to_string()); + println!("Select by idx {:?}", select_by_attr.unwrap().vals); + + // Update Value query + let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); let _ = block_on(update); - let update2 = my_table.update_all_by_id( - AllByIdQuery { - attr: "TESTTEST".to_string(), - attr2: 13375, - }, - pk2, - ); + let select_all = my_table.select_all().execute(); + println!("Select after update val {:?}", select_all); - let _ = block_on(update2); - // - //let select_all = my_table.select_all().execute(); - //println!("Select after update val {:?}", select_all); - // let delete = my_table.delete(pk); let _ = block_on(delete); - // + let select_all = my_table.select_all().execute(); println!("Select after delete {:?}", select_all); } diff --git a/src/lock/set.rs b/src/lock/set.rs index e2af9e05..d8ae702a 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -3,39 +3,39 @@ use std::sync::Arc; use lockfree::map::Map; #[derive(Debug)] -pub struct LockMap +pub struct LockMap where - PrimaryKey: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + std::cmp::Ord, { - set: Map>>, + set: Map>>, } -impl Default for LockMap +impl Default for LockMap where - PrimaryKey: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + std::cmp::Ord, { fn default() -> Self { Self::new() } } -impl LockMap +impl LockMap where - PrimaryKey: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + std::cmp::Ord, { pub fn new() -> Self { Self { set: Map::new() } } - pub fn insert(&self, id: PrimaryKey, lock: Arc) { + pub fn insert(&self, id: PkType, lock: Arc) { self.set.insert(id, Some(lock)); } - pub fn get(&self, id: &PrimaryKey) -> Option> { + pub fn get(&self, id: &PkType) -> Option> { self.set.get(id).map(|v| v.val().clone())? } - pub fn remove(&self, id: &PrimaryKey) { + pub fn remove(&self, id: &PkType) { self.set.remove(id); } } From bb1264feed768babc622e82e9b6f4dfee042e8cd Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Sun, 23 Feb 2025 18:22:03 +0300 Subject: [PATCH 07/11] Fix visibility --- src/lock/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 2b0f53e0..fdbe78c2 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -12,7 +12,7 @@ pub use set::LockMap; #[derive(Debug)] pub struct Lock { - pub locked: AtomicBool, + locked: AtomicBool, waker: AtomicWaker, } From d9dc23302350a10ca9e65b5fa2fa870c7d23869b Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Sun, 23 Feb 2025 18:24:28 +0300 Subject: [PATCH 08/11] Fix unused methods --- src/lock/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lock/mod.rs b/src/lock/mod.rs index fdbe78c2..d405d117 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -33,10 +33,6 @@ impl Lock { self.locked.store(true, Ordering::Relaxed); self.waker.wake() } - - pub fn is_locked(&self) -> bool { - self.locked.load(Ordering::Acquire) - } } impl Future for &Lock { From 6c7aae9d6e4b49f4037fb0e01e71ad6df894eae0 Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Mon, 24 Feb 2025 13:06:20 +0300 Subject: [PATCH 09/11] Fix commit notes --- codegen/src/worktable/generator/locks.rs | 172 ++++++++---------- .../src/worktable/generator/queries/delete.rs | 5 +- .../src/worktable/generator/queries/locks.rs | 11 -- .../src/worktable/generator/queries/update.rs | 12 +- src/lock/set.rs | 23 ++- 5 files changed, 104 insertions(+), 119 deletions(-) diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs index 7011f737..b695db92 100644 --- a/codegen/src/worktable/generator/locks.rs +++ b/codegen/src/worktable/generator/locks.rs @@ -5,34 +5,24 @@ use quote::quote; impl Generator { pub fn gen_locks_def(&self) -> TokenStream { - let type_ = self.gen_locks_type(); - let impl_ = self.gen_locks_impl(); + let type_def = self.gen_locks_type(); + let impl_def = self.gen_locks_impl(); quote! { - #type_ - #impl_ + #type_def + #impl_def } } - pub fn gen_locks_type(&self) -> TokenStream { + fn gen_locks_type(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let lock_ident = name_generator.get_lock_type_ident(); - - let row_locks = self - .columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #name: Option>, - } - }) - .collect::>(); + let row_locks = self.gen_row_locks(); quote! { #[derive(Debug, Clone)] pub struct #lock_ident { + id: u16, lock: Option>, #(#row_locks)* } @@ -43,32 +33,84 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let lock_ident = name_generator.get_lock_type_ident(); - let row_new = self - .columns + let row_new = self.gen_row_new(); + let row_with_lock = self.gen_row_with_lock(); + let row_lock_await = self.gen_row_lock_await(); + let row_unlock = self.gen_row_unlock(); + + quote! { + impl #lock_ident { + pub fn new(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: None, + #(#row_new),* + } + } + + pub fn with_lock(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: Some(std::sync::Arc::new(Lock::new())), + #(#row_with_lock),* + } + } + + pub fn unlock(&self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + #(#row_unlock)* + } + + pub async fn lock_await(&self) { + let mut futures = Vec::new(); + + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); + } + #(#row_lock_await)* + futures::future::join_all(futures).await; + } + } + } + } + + fn gen_row_locks(&self) -> Vec { + self.columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #name: Option>, } + }) + .collect() + } + + fn gen_row_new(&self) -> Vec { + self.columns .columns_map .keys() .map(|i| { let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #col: None - } + quote! { #col: None } }) - .collect::>(); + .collect() + } - let row_with_lock = self - .columns + fn gen_row_with_lock(&self) -> Vec { + self.columns .columns_map .keys() .map(|i| { let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #col: Some(std::sync::Arc::new(Lock::new())) - } + quote! { #col: Some(std::sync::Arc::new(Lock::new())) } }) - .collect::>(); + .collect() + } - let row_lock_await = self - .columns + fn gen_row_lock_await(&self) -> Vec { + self.columns .columns_map .keys() .map(|col| { @@ -79,10 +121,11 @@ impl Generator { } } }) - .collect::>(); + .collect() + } - let row_unlock = self - .columns + fn gen_row_unlock(&self) -> Vec { + self.columns .columns_map .keys() .map(|col| { @@ -93,65 +136,6 @@ impl Generator { } } }) - .collect::>(); - - let row_lock = self - .columns - .columns_map - .keys() - .map(|col| { - let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if let Some(#col) = &self.#col { - #col.lock(); - } - } - }) - .collect::>(); - - quote! { - - impl #lock_ident { - pub fn new() -> Self { - Self { - lock: None, - #(#row_new),* - } - } - - pub fn with_lock() -> Self { - Self { - lock: Some(std::sync::Arc::new(Lock::new())), - #(#row_with_lock),* - - } - } - - pub fn lock(&self) { - if let Some(lock) = &self.lock { - lock.lock(); - } - #(#row_lock)* - - } - - pub fn unlock(&self) { - if let Some(lock) = &self.lock { - lock.unlock(); - } - #(#row_unlock)* - } - - pub async fn lock_await(&self) { - let mut futures = Vec::new(); - - if let Some(lock) = &self.lock { - futures.push(lock.as_ref()); - } - #(#row_lock_await)* - futures::future::join_all(futures).await; - } - } - } + .collect() } } diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 2fd589ac..9a060257 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -44,17 +44,16 @@ impl Generator { lock.lock_await().await; // Waiting for all locks released } - let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with None + let lock_id = self.0.lock_map.next_id(); + let lock = std::sync::Arc::new(#lock_ident::with_lock(lock_id.into())); //Creates new LockType with None self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap - let link = self.0 .pk_map .get(&pk) .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; - let row = self.select(pk.clone()).unwrap(); self.0.indexes.delete_row(row, link)?; self.0.pk_map.remove(&pk); diff --git a/codegen/src/worktable/generator/queries/locks.rs b/codegen/src/worktable/generator/queries/locks.rs index 95eab459..a4b2d9dc 100644 --- a/codegen/src/worktable/generator/queries/locks.rs +++ b/codegen/src/worktable/generator/queries/locks.rs @@ -88,27 +88,16 @@ impl Generator { quote! { pub fn #lock_ident(&mut self) { - if self.lock.is_none() { - self.lock = Some(std::sync::Arc::new(Lock::new())); - } #(#rows_lock)* } pub fn #unlock_ident(&self) { - if let Some(lock) = &self.lock { - lock.unlock(); - } #(#rows_unlock)* } pub async fn #lock_await_ident(&self) { let mut futures = Vec::new(); - if let Some(lock) = &self.lock { - futures.push(lock.as_ref()); - } - - #(#rows_lock_await)* futures::future::join_all(futures).await; } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 4179f889..455a7f10 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -68,7 +68,8 @@ impl Generator { lock.lock_await().await; // Waiting for all locks released } - let lock = std::sync::Arc::new(#lock_ident::with_lock()); //Creates new LockType with Locks + let lock_id = self.0.lock_map.next_id(); + let lock = std::sync::Arc::new(#lock_ident::with_lock(lock_id.into())); //Creates new LockType with Locks self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; @@ -237,7 +238,8 @@ impl Generator { if let Some(lock) = self.0.lock_map.get(&by) { lock.#lock_await_ident().await; // Waiting for all locks released } - let mut lock = #lock_type_ident::new(); //Creates new LockType with None + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); //Creates new LockType with None lock.#lock_ident(); self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); @@ -315,7 +317,8 @@ impl Generator { for (_, link) in self.0.indexes.#index.get(&by) { let pk = self.0.data.select(*link)?.get_primary_key(); - let mut lock = #lock_type_ident::new(); + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); lock.#lock_ident(); self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); } @@ -406,7 +409,8 @@ impl Generator { if let Some(lock) = self.0.lock_map.get(&pk) { lock.#lock_await_ident().await; } - let mut lock = #lock_type_ident::new(); + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); lock.#lock_ident(); self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); diff --git a/src/lock/set.rs b/src/lock/set.rs index d8ae702a..f3a22a7e 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use lockfree::map::Map; @@ -8,6 +9,7 @@ where PkType: std::hash::Hash + std::cmp::Ord, { set: Map>>, + next_id: AtomicU16, } impl Default for LockMap @@ -24,18 +26,25 @@ where PkType: std::hash::Hash + std::cmp::Ord, { pub fn new() -> Self { - Self { set: Map::new() } + Self { + set: Map::new(), + next_id: AtomicU16::default(), + } } - pub fn insert(&self, id: PkType, lock: Arc) { - self.set.insert(id, Some(lock)); + pub fn insert(&self, key: PkType, lock: Arc) { + self.set.insert(key, Some(lock)); } - pub fn get(&self, id: &PkType) -> Option> { - self.set.get(id).map(|v| v.val().clone())? + pub fn get(&self, key: &PkType) -> Option> { + self.set.get(key).map(|v| v.val().clone())? } - pub fn remove(&self, id: &PkType) { - self.set.remove(id); + pub fn remove(&self, key: &PkType) { + self.set.remove(key); + } + + pub fn next_id(&self) -> u16 { + self.next_id.fetch_add(1, Ordering::Relaxed) } } From 01d49b2df4df1bfade6923fa0723ed9213632fbf Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 24 Feb 2025 20:36:36 +0300 Subject: [PATCH 10/11] correction --- codegen/src/worktable/generator/locks.rs | 37 ++++++++++++++---------- src/lock/set.rs | 6 ++-- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs index b695db92..ef5bbed5 100644 --- a/codegen/src/worktable/generator/locks.rs +++ b/codegen/src/worktable/generator/locks.rs @@ -33,20 +33,14 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let lock_ident = name_generator.get_lock_type_ident(); - let row_new = self.gen_row_new(); + let new_fn = self.gen_lock_new_fn(); let row_with_lock = self.gen_row_with_lock(); let row_lock_await = self.gen_row_lock_await(); let row_unlock = self.gen_row_unlock(); quote! { impl #lock_ident { - pub fn new(lock_id: u16) -> Self { - Self { - id: lock_id, - lock: None, - #(#row_new),* - } - } + #new_fn pub fn with_lock(lock_id: u16) -> Self { Self { @@ -76,24 +70,35 @@ impl Generator { } } - fn gen_row_locks(&self) -> Vec { - self.columns + fn gen_lock_new_fn(&self) -> TokenStream { + let fields = self + .columns .columns_map .keys() .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { #name: Option>, } + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #col: None } }) - .collect() + .collect(); + + quote! { + pub fn new(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: None, + #(#fields),* + } + } + } } - fn gen_row_new(&self) -> Vec { + fn gen_row_locks(&self) -> Vec { self.columns .columns_map .keys() .map(|i| { - let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { #col: None } + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #name: Option>, } }) .collect() } diff --git a/src/lock/set.rs b/src/lock/set.rs index f3a22a7e..948eb676 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -6,7 +6,7 @@ use lockfree::map::Map; #[derive(Debug)] pub struct LockMap where - PkType: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + Ord, { set: Map>>, next_id: AtomicU16, @@ -14,7 +14,7 @@ where impl Default for LockMap where - PkType: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + Ord, { fn default() -> Self { Self::new() @@ -23,7 +23,7 @@ where impl LockMap where - PkType: std::hash::Hash + std::cmp::Ord, + PkType: std::hash::Hash + Ord, { pub fn new() -> Self { Self { From 807db51a9fe8082faad46510f1947451c44732ed Mon Sep 17 00:00:00 2001 From: Kira Sotnikov Date: Tue, 25 Feb 2025 15:13:59 +0300 Subject: [PATCH 11/11] Small fix for locks impl gen --- codegen/src/worktable/generator/locks.rs | 127 ++++++++++++----------- 1 file changed, 69 insertions(+), 58 deletions(-) diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs index ef5bbed5..f34de0ba 100644 --- a/codegen/src/worktable/generator/locks.rs +++ b/codegen/src/worktable/generator/locks.rs @@ -5,26 +5,36 @@ use quote::quote; impl Generator { pub fn gen_locks_def(&self) -> TokenStream { - let type_def = self.gen_locks_type(); - let impl_def = self.gen_locks_impl(); + let type_ = self.gen_locks_type(); + let impl_ = self.gen_locks_impl(); quote! { - #type_def - #impl_def + #type_ + #impl_ + } } fn gen_locks_type(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let lock_ident = name_generator.get_lock_type_ident(); - let row_locks = self.gen_row_locks(); + + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #name: Option>, } + }) + .collect(); quote! { #[derive(Debug, Clone)] pub struct #lock_ident { id: u16, lock: Option>, - #(#row_locks)* + #(#rows)* } } } @@ -33,45 +43,23 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let lock_ident = name_generator.get_lock_type_ident(); - let new_fn = self.gen_lock_new_fn(); - let row_with_lock = self.gen_row_with_lock(); - let row_lock_await = self.gen_row_lock_await(); - let row_unlock = self.gen_row_unlock(); + let new_fn = self.gen_new_fn(); + let with_lock_fn = self.gen_with_lock_fn(); + let lock_await_fn = self.gen_lock_await_fn(); + let unlock_fn = self.gen_unlock_fn(); quote! { impl #lock_ident { #new_fn - - pub fn with_lock(lock_id: u16) -> Self { - Self { - id: lock_id, - lock: Some(std::sync::Arc::new(Lock::new())), - #(#row_with_lock),* - } - } - - pub fn unlock(&self) { - if let Some(lock) = &self.lock { - lock.unlock(); - } - #(#row_unlock)* - } - - pub async fn lock_await(&self) { - let mut futures = Vec::new(); - - if let Some(lock) = &self.lock { - futures.push(lock.as_ref()); - } - #(#row_lock_await)* - futures::future::join_all(futures).await; - } + #with_lock_fn + #lock_await_fn + #unlock_fn } } } - fn gen_lock_new_fn(&self) -> TokenStream { - let fields = self + fn gen_new_fn(&self) -> TokenStream { + let rows: Vec<_> = self .columns .columns_map .keys() @@ -86,36 +74,37 @@ impl Generator { Self { id: lock_id, lock: None, - #(#fields),* + #(#rows),* } } } } - fn gen_row_locks(&self) -> Vec { - self.columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { #name: Option>, } - }) - .collect() - } - - fn gen_row_with_lock(&self) -> Vec { - self.columns + fn gen_with_lock_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns .columns_map .keys() .map(|i| { let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); quote! { #col: Some(std::sync::Arc::new(Lock::new())) } }) - .collect() + .collect(); + + quote! { + pub fn with_lock(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: Some(std::sync::Arc::new(Lock::new())), + #(#rows),* + } + } + } } - fn gen_row_lock_await(&self) -> Vec { - self.columns + fn gen_lock_await_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns .columns_map .keys() .map(|col| { @@ -126,11 +115,23 @@ impl Generator { } } }) - .collect() + .collect(); + quote! { + pub async fn lock_await(&self) { + let mut futures = Vec::new(); + + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); + } + #(#rows)* + futures::future::join_all(futures).await; + } + } } - fn gen_row_unlock(&self) -> Vec { - self.columns + fn gen_unlock_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns .columns_map .keys() .map(|col| { @@ -141,6 +142,16 @@ impl Generator { } } }) - .collect() + .collect(); + + quote! { + pub fn unlock(&self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + #(#rows)* + } + + } } }