diff --git a/codegen/src/name_generator.rs b/codegen/src/name_generator.rs index cdc3fe96..52042731 100644 --- a/codegen/src/name_generator.rs +++ b/codegen/src/name_generator.rs @@ -54,6 +54,10 @@ impl WorktableNameGenerator { Ident::new(format!("{}Wrapper", self.name).as_str(), Span::mixed_site()) } + pub fn get_lock_type_ident(&self) -> Ident { + Ident::new(format!("{}Lock", self.name).as_str(), Span::mixed_site()) + } + pub fn get_index_type_ident(&self) -> Ident { Ident::new(format!("{}Index", self.name).as_str(), Span::mixed_site()) } diff --git a/codegen/src/worktable/generator/locks.rs b/codegen/src/worktable/generator/locks.rs new file mode 100644 index 00000000..f34de0ba --- /dev/null +++ b/codegen/src/worktable/generator/locks.rs @@ -0,0 +1,157 @@ +use crate::name_generator::WorktableNameGenerator; +use crate::worktable::generator::Generator; +use proc_macro2::{Ident, Span, TokenStream}; +use quote::quote; + +impl Generator { + pub fn gen_locks_def(&self) -> TokenStream { + let type_ = self.gen_locks_type(); + let impl_ = self.gen_locks_impl(); + + quote! { + #type_ + #impl_ + + } + } + + fn gen_locks_type(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|i| { + let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #name: Option>, } + }) + .collect(); + + quote! { + #[derive(Debug, Clone)] + pub struct #lock_ident { + id: u16, + lock: Option>, + #(#rows)* + } + } + } + + fn gen_locks_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + + let new_fn = self.gen_new_fn(); + let with_lock_fn = self.gen_with_lock_fn(); + let lock_await_fn = self.gen_lock_await_fn(); + let unlock_fn = self.gen_unlock_fn(); + + quote! { + impl #lock_ident { + #new_fn + #with_lock_fn + #lock_await_fn + #unlock_fn + } + } + } + + fn gen_new_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|i| { + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #col: None } + }) + .collect(); + + quote! { + pub fn new(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: None, + #(#rows),* + } + } + } + } + + fn gen_with_lock_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|i| { + let col = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); + quote! { #col: Some(std::sync::Arc::new(Lock::new())) } + }) + .collect(); + + quote! { + pub fn with_lock(lock_id: u16) -> Self { + Self { + id: lock_id, + lock: Some(std::sync::Arc::new(Lock::new())), + #(#rows),* + } + } + } + } + + fn gen_lock_await_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(lock) = &self.#col { + futures.push(lock.as_ref()); + } + } + }) + .collect(); + quote! { + pub async fn lock_await(&self) { + let mut futures = Vec::new(); + + if let Some(lock) = &self.lock { + futures.push(lock.as_ref()); + } + #(#rows)* + futures::future::join_all(futures).await; + } + } + } + + fn gen_unlock_fn(&self) -> TokenStream { + let rows: Vec<_> = self + .columns + .columns_map + .keys() + .map(|col| { + let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); + quote! { + if let Some(#col) = &self.#col { + #col.unlock(); + } + } + }) + .collect(); + + quote! { + pub fn unlock(&self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + #(#rows)* + } + + } + } +} diff --git a/codegen/src/worktable/generator/mod.rs b/codegen/src/worktable/generator/mod.rs index a07f4d0e..bcdb9fd8 100644 --- a/codegen/src/worktable/generator/mod.rs +++ b/codegen/src/worktable/generator/mod.rs @@ -1,4 +1,5 @@ mod index; +mod locks; mod primary_key; mod queries; mod row; diff --git a/codegen/src/worktable/generator/primary_key.rs b/codegen/src/worktable/generator/primary_key.rs index 348590c7..e951a62d 100644 --- a/codegen/src/worktable/generator/primary_key.rs +++ b/codegen/src/worktable/generator/primary_key.rs @@ -69,7 +69,8 @@ impl Generator { Into, PartialEq, PartialOrd, - Ord + Hash, + Ord, )] pub struct #ident(#(#types),*); } diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index f3abf84a..9a060257 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -34,27 +34,34 @@ impl Generator { fn gen_full_row_delete(&mut self) -> TokenStream { let pk_ident = &self.pk.as_ref().unwrap().ident; + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_ident = name_generator.get_lock_type_ident(); + quote! { pub async fn delete(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { + + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.lock_await().await; // Waiting for all locks released + } + + let lock_id = self.0.lock_map.next_id(); + let lock = std::sync::Arc::new(#lock_ident::with_lock(lock_id.into())); //Creates new LockType with None + self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap + let link = self.0 .pk_map .get(&pk) .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; - let id = self.0.data.with_ref(link, |archived| { - archived.is_locked() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } let row = self.select(pk.clone()).unwrap(); self.0.indexes.delete_row(row, link)?; self.0.pk_map.remove(&pk); self.0.data.delete(link).map_err(WorkTableError::PagesError)?; + lock.unlock(); // Releases locks + self.0.lock_map.remove(&pk); // Removes locks + core::result::Result::Ok(()) } } diff --git a/codegen/src/worktable/generator/queries/locks.rs b/codegen/src/worktable/generator/queries/locks.rs index fd0ac853..a4b2d9dc 100644 --- a/codegen/src/worktable/generator/queries/locks.rs +++ b/codegen/src/worktable/generator/queries/locks.rs @@ -8,11 +8,7 @@ impl Generator { pub fn gen_query_locks_impl(&mut self) -> syn::Result { if let Some(q) = &self.queries { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); - let wrapper_name = name_generator.get_wrapper_type_ident(); - let archived_wrapper = Ident::new( - format!("Archived{}", &wrapper_name).as_str(), - Span::mixed_site(), - ); + let lock_type_ident = name_generator.get_lock_type_ident(); let fns = q .updates @@ -23,32 +19,22 @@ impl Generator { .from_case(Case::Pascal) .to_case(Case::Snake); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); - let checks = q - .updates - .get(name) - .expect("exists") - .columns - .iter() - .map(|col| { - let col = - Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); - quote! { - if self.#col != 0 { - return Some(self.#col.into()); - } - } - }) - .collect::>(); let lock_ident = Ident::new( format!("lock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let locks = q + + let unlock_ident = Ident::new( + format!("unlock_{snake_case_name}").as_str(), + Span::mixed_site(), + ); + + let rows_lock_await = q .updates .get(name) .expect("exists") @@ -58,16 +44,14 @@ impl Generator { let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - self.#col = id.into(); + if let Some(lock) = &self.#col { + futures.push(lock.as_ref()); + } } }) .collect::>(); - let unlock_ident = Ident::new( - format!("unlock_{snake_case_name}").as_str(), - Span::mixed_site(), - ); - let unlocks = q + let rows_lock = q .updates .get(name) .expect("exists") @@ -77,16 +61,14 @@ impl Generator { let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - self.#col = 0u16.into(); + if self.#col.is_none() { + self.#col = Some(std::sync::Arc::new(Lock::new())); + } } }) .collect::>(); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); - let verify = q + let rows_unlock = q .updates .get(name) .expect("exists") @@ -96,40 +78,35 @@ impl Generator { let col = Ident::new(format!("{}_lock", col).as_str(), Span::mixed_site()); quote! { - if self.#col != id { - return false; + if let Some(#col) = &self.#col { + #col.unlock(); } } }) .collect::>(); quote! { - pub fn #check_ident(&self) -> Option { - if self.lock != 0 { - return Some(self.lock.into()); - } - #(#checks)* - None - } - pub unsafe fn #lock_ident(&mut self, id: u16) { - #(#locks)* + pub fn #lock_ident(&mut self) { + #(#rows_lock)* } - pub unsafe fn #unlock_ident(&mut self) { - #(#unlocks)* + pub fn #unlock_ident(&self) { + #(#rows_unlock)* } - pub fn #verify_ident(&self, id: u16) -> bool { - #(#verify)* - true + pub async fn #lock_await_ident(&self) { + let mut futures = Vec::new(); + + #(#rows_lock_await)* + futures::future::join_all(futures).await; } } }) .collect::>(); Ok(quote! { - impl #archived_wrapper { + impl #lock_type_ident { #(#fns)* } }) diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index a607fd2c..455a7f10 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -35,6 +35,7 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_ident = name_generator.get_row_type_ident(); let avt_type_ident = name_generator.get_available_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); let row_updates = self .columns @@ -63,9 +64,13 @@ impl Generator { quote! { pub async fn update(&self, row: #row_ident) -> core::result::Result<(), WorkTableError> { let pk = row.get_primary_key(); - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - self.0.lock_map.insert(op_id.into(), lock.clone()); + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.lock_await().await; // Waiting for all locks released + } + + let lock_id = self.0.lock_map.next_id(); + let lock = std::sync::Arc::new(#lock_ident::with_lock(lock_id.into())); //Creates new LockType with Locks + self.0.lock_map.insert(pk.clone(), lock.clone()); // adds LockType to LockMap let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#row_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -78,27 +83,12 @@ impl Generator { #diff_container #diff_process - let id = self.0.data.with_ref(link, |archived| { - archived.is_locked() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - unsafe { self.0.data.with_mut_ref(link, |archived| { - archived.lock = op_id.into(); - }).map_err(WorkTableError::PagesError)? }; unsafe { self.0.data.with_mut_ref(link, move |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.lock = 0u16.into(); - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + + lock.unlock(); // Releases locks + self.0.lock_map.remove(&pk); // Removes locks core::result::Result::Ok(()) } } @@ -195,27 +185,25 @@ impl Generator { idx_idents: Option<&Vec>, ) -> TokenStream { let pk_ident = &self.pk.as_ref().unwrap().ident; + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_type_ident = name_generator.get_lock_type_ident(); + let method_ident = Ident::new( format!("update_{snake_case_name}").as_str(), Span::mixed_site(), ); - let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); - let lock_ident = Ident::new( - format!("lock_{snake_case_name}").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let unlock_ident = Ident::new( format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), + let lock_ident = Ident::new( + format!("lock_{snake_case_name}").as_str(), Span::mixed_site(), ); @@ -229,7 +217,6 @@ impl Generator { .collect::>(); let diff_process = if let Some(idx_idents) = idx_idents { - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let avt_type_ident = name_generator.get_available_type_ident(); let diff_container = quote! { let row_old = self.select(by.clone()).unwrap(); @@ -248,10 +235,14 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #pk_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); + if let Some(lock) = self.0.lock_map.get(&by) { + lock.#lock_await_ident().await; // Waiting for all locks released + } + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); //Creates new LockType with None + lock.#lock_ident(); - self.0.lock_map.insert(op_id.into(), lock.clone()); + self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone())); let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -263,33 +254,12 @@ impl Generator { #diff_process - let id = self.0.data.with_ref(link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } - } - unsafe { self.0.data.with_mut_ref(link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + lock.#unlock_ident(); + self.0.lock_map.remove(&by); core::result::Result::Ok(()) } @@ -303,6 +273,9 @@ impl Generator { index: &Ident, idents: &[Ident], ) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_type_ident = name_generator.get_lock_type_ident(); + let method_ident = Ident::new( format!("update_{snake_case_name}").as_str(), Span::mixed_site(), @@ -311,8 +284,8 @@ impl Generator { let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); let by_ident = Ident::new(format!("{name}By").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let lock_ident = Ident::new( @@ -323,10 +296,7 @@ impl Generator { format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); + let row_updates = idents .iter() .map(|i| { @@ -338,47 +308,42 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); - - self.0.lock_map.insert(op_id.into(), lock.clone()); - for (_, link) in self.0.indexes.#index.get(&by) { - let id = self.0.data.with_ref(*link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } + let pk = self.0.data.select(*link)?.get_primary_key(); + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#lock_await_ident().await; } - unsafe { self.0.data.with_mut_ref(*link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; } for (_, link) in self.0.indexes.#index.get(&by) { - let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; - let mut row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; - unsafe { self.0.data.with_mut_ref(*link, |archived| { - #(#row_updates)* - }).map_err(WorkTableError::PagesError)? }; + let pk = self.0.data.select(*link)?.get_primary_key(); + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); + lock.#lock_ident(); + self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); } for (_, link) in self.0.indexes.#index.get(&by) { - unsafe { self.0.data.with_mut_ref(*link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; - } - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + let mut row = unsafe { + rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; + unsafe { + self.0.data.with_mut_ref(*link, |archived| { + #(#row_updates)* + }).map_err(WorkTableError::PagesError)?; + } + } + for (_, link) in self.0.indexes.#index.get(&by) { + let pk = self.0.data.select(*link)?.get_primary_key(); + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#unlock_ident(); + self.0.lock_map.remove(&pk); + } + } core::result::Result::Ok(()) } } @@ -391,6 +356,9 @@ impl Generator { index: &Ident, idents: &[Ident], ) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let lock_type_ident = name_generator.get_lock_type_ident(); + let method_ident = Ident::new( format!("update_{snake_case_name}").as_str(), Span::mixed_site(), @@ -399,8 +367,8 @@ impl Generator { let query_ident = Ident::new(format!("{name}Query").as_str(), Span::mixed_site()); let by_ident = Ident::new(format!("{name}By").as_str(), Span::mixed_site()); - let check_ident = Ident::new( - format!("check_{snake_case_name}_lock").as_str(), + let lock_await_ident = Ident::new( + format!("lock_await_{snake_case_name}").as_str(), Span::mixed_site(), ); let lock_ident = Ident::new( @@ -411,10 +379,7 @@ impl Generator { format!("unlock_{snake_case_name}").as_str(), Span::mixed_site(), ); - let verify_ident = Ident::new( - format!("verify_{snake_case_name}_lock").as_str(), - Span::mixed_site(), - ); + let row_updates = idents .iter() .map(|i| { @@ -426,41 +391,37 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - let op_id = self.0.lock_map.next_id(); - let lock = std::sync::Arc::new(Lock::new()); + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + + let mut row = unsafe { + rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; - self.0.lock_map.insert(op_id.into(), lock.clone()); + let link = self.0.indexes.#index + .get(&by) + .map(|kv| kv.get().value) + .ok_or(WorkTableError::NotFound)?; - let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; - let mut row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; - let link = self.0.indexes.#index.get(&by).map(|kv| kv.get().value).ok_or(WorkTableError::NotFound)?; - let id = self.0.data.with_ref(link, |archived| { - archived.#check_ident() - }).map_err(WorkTableError::PagesError)?; - if let Some(id) = id { - if let Some(lock) = self.0.lock_map.get(&(id.into())) { - lock.as_ref().await - } + let pk = self.0.data.select(link)?.get_primary_key(); + + if let Some(lock) = self.0.lock_map.get(&pk) { + lock.#lock_await_ident().await; } - unsafe { self.0.data.with_mut_ref(link, |archived| { - while !archived.#verify_ident(op_id) { - unsafe { - archived.#lock_ident(op_id) - } - } - }).map_err(WorkTableError::PagesError)? }; + let lock_id = self.0.lock_map.next_id(); + let mut lock = #lock_type_ident::new(lock_id.into()); + lock.#lock_ident(); + self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone())); - unsafe { self.0.data.with_mut_ref(link, |archived| { - #(#row_updates)* - }).map_err(WorkTableError::PagesError)? }; + unsafe { + self.0.data.with_mut_ref(link, |archived| { + #(#row_updates)* + }).map_err(WorkTableError::PagesError)?; + } - unsafe { self.0.data.with_mut_ref(link, |archived| { - unsafe { - archived.#unlock_ident() - } - }).map_err(WorkTableError::PagesError)? }; - lock.unlock(); - self.0.lock_map.remove(&op_id.into()); + lock.#unlock_ident(); + self.0.lock_map.remove(&pk); core::result::Result::Ok(()) } diff --git a/codegen/src/worktable/generator/table/mod.rs b/codegen/src/worktable/generator/table/mod.rs index e5dba507..54861711 100644 --- a/codegen/src/worktable/generator/table/mod.rs +++ b/codegen/src/worktable/generator/table/mod.rs @@ -57,6 +57,7 @@ impl Generator { let index_type = name_generator.get_index_type_ident(); let inner_const_name = name_generator.get_page_inner_size_const_ident(); let avt_type_ident = name_generator.get_available_type_ident(); + let lock_ident = name_generator.get_lock_type_ident(); let derive = if self.is_persist { quote! { @@ -84,6 +85,7 @@ impl Generator { #primary_key_type, #avt_type_ident, #index_type, + #lock_ident, <#primary_key_type as TablePrimaryKey>::Generator, #inner_const_name > @@ -98,7 +100,8 @@ impl Generator { #row_type, #primary_key_type, #avt_type_ident, - #index_type + #index_type, + #lock_ident, > #persist_type_part ); diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index 0d10670c..c50df988 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -1,19 +1,17 @@ use crate::name_generator::WorktableNameGenerator; use crate::worktable::generator::Generator; -use proc_macro2::{Ident, Span, TokenStream}; +use proc_macro2::TokenStream; use quote::quote; impl Generator { pub fn gen_wrapper_def(&self) -> TokenStream { let type_ = self.gen_wrapper_type(); let impl_ = self.gen_wrapper_impl(); - let archived_impl = self.get_wrapper_archived_impl(); let storable_impl = self.get_wrapper_storable_impl(); quote! { #type_ #impl_ - #archived_impl #storable_impl } } @@ -23,25 +21,12 @@ impl Generator { let row_ident = name_generator.get_row_type_ident(); let wrapper_ident = name_generator.get_wrapper_type_ident(); - let row_locks = self - .columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #name: u16, - } - }) - .collect::>(); quote! { #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] #[repr(C)] pub struct #wrapper_ident { inner: #row_ident, is_deleted: bool, - lock: u16, - #(#row_locks)* } } } @@ -51,19 +36,8 @@ impl Generator { let wrapper_ident = name_generator.get_wrapper_type_ident(); let row_ident = name_generator.get_row_type_ident(); - let row_defaults = self - .columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - #name: Default::default(), - } - }) - .collect::>(); - quote! { + impl RowWrapper<#row_ident> for #wrapper_ident { fn get_inner(self) -> #row_ident { self.inner @@ -73,43 +47,7 @@ impl Generator { Self { inner, is_deleted: Default::default(), - lock: Default::default(), - #(#row_defaults)* - } - } - } - } - } - - fn get_wrapper_archived_impl(&self) -> TokenStream { - let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); - let wrapper_ident = name_generator.get_wrapper_type_ident(); - let archived_wrapper_ident = Ident::new( - format!("Archived{}", &wrapper_ident).as_str(), - Span::mixed_site(), - ); - let checks = self - .columns - .columns_map - .keys() - .map(|i| { - let name = Ident::new(format!("{i}_lock").as_str(), Span::mixed_site()); - quote! { - if self.#name != 0 { - return Some(self.#name.into()); - } - } - }) - .collect::>(); - - quote! { - impl ArchivedRow for #archived_wrapper_ident { - fn is_locked(&self) -> Option { - if self.lock != 0 { - return Some(self.lock.into()); } - #(#checks)* - None } } } diff --git a/codegen/src/worktable/mod.rs b/codegen/src/worktable/mod.rs index e67c1f0c..c38bc846 100644 --- a/codegen/src/worktable/mod.rs +++ b/codegen/src/worktable/mod.rs @@ -50,6 +50,7 @@ pub fn expand(input: TokenStream) -> syn::Result { let pk_def = generator.gen_primary_key_def()?; let row_def = generator.gen_row_def(); let wrapper_def = generator.gen_wrapper_def(); + let locks_def = generator.gen_locks_def(); let index_def = generator.gen_index_def(); let table_def = generator.gen_table_def()?; let query_types_def = generator.gen_result_types_def()?; @@ -64,6 +65,7 @@ pub fn expand(input: TokenStream) -> syn::Result { #row_def #query_available_def #wrapper_def + #locks_def #index_def #table_def #query_types_def diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index ab18d8c8..126cdeb7 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -4,4 +4,4 @@ mod row; pub use data::{Data, ExecutionError as DataExecutionError, DATA_INNER_LENGTH}; pub use pages::{DataPages, ExecutionError as PagesExecutionError}; -pub use row::{ArchivedRow, RowWrapper, StorableRow}; +pub use row::{RowWrapper, StorableRow}; diff --git a/src/in_memory/row.rs b/src/in_memory/row.rs index ec6a1e57..7586dec0 100644 --- a/src/in_memory/row.rs +++ b/src/in_memory/row.rs @@ -17,10 +17,6 @@ pub trait RowWrapper { fn from_inner(inner: Inner) -> Self; } -pub trait ArchivedRow { - fn is_locked(&self) -> Option; -} - /// General `Row` wrapper that is used to append general data for every `Inner` /// `Row`. #[derive(Archive, Deserialize, Debug, Serialize)] @@ -46,12 +42,3 @@ impl RowWrapper for GeneralRow { } } } - -impl ArchivedRow for ArchivedGeneralRow -where - Inner: Archive, -{ - fn is_locked(&self) -> Option { - None - } -} diff --git a/src/lib.rs b/src/lib.rs index 725ada04..6696619d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub use worktable_codegen::worktable; pub mod prelude { pub use crate::database::DatabaseManager; - pub use crate::in_memory::{ArchivedRow, Data, DataPages, RowWrapper, StorableRow}; + pub use crate::in_memory::{Data, DataPages, RowWrapper, StorableRow}; pub use crate::lock::LockMap; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{ diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 51b26194..d405d117 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -7,21 +7,34 @@ use std::task::{Context, Poll}; use derive_more::From; use futures::task::AtomicWaker; -use rkyv::{Archive, Deserialize, Serialize}; pub use set::LockMap; -#[derive( - Archive, Clone, Copy, Deserialize, Debug, Eq, From, Hash, Ord, Serialize, PartialEq, PartialOrd, -)] -pub struct LockId(u16); - #[derive(Debug)] pub struct Lock { locked: AtomicBool, waker: AtomicWaker, } +impl Lock { + pub fn new() -> Self { + Self { + locked: AtomicBool::from(true), + waker: AtomicWaker::new(), + } + } + + pub fn unlock(&self) { + self.locked.store(false, Ordering::Relaxed); + self.waker.wake() + } + + pub fn lock(&self) { + self.locked.store(true, Ordering::Relaxed); + self.waker.wake() + } +} + impl Future for &Lock { type Output = (); @@ -40,17 +53,3 @@ impl Default for Lock { Self::new() } } - -impl Lock { - pub fn new() -> Self { - Self { - locked: AtomicBool::from(true), - waker: AtomicWaker::new(), - } - } - - pub fn unlock(&self) { - self.locked.store(false, Ordering::Relaxed); - self.waker.wake() - } -} diff --git a/src/lock/set.rs b/src/lock/set.rs index 0022a43a..948eb676 100644 --- a/src/lock/set.rs +++ b/src/lock/set.rs @@ -3,22 +3,28 @@ use std::sync::Arc; use lockfree::map::Map; -use crate::lock::{Lock, LockId}; - #[derive(Debug)] -pub struct LockMap { - set: Map>, - +pub struct LockMap +where + PkType: std::hash::Hash + Ord, +{ + set: Map>>, next_id: AtomicU16, } -impl Default for LockMap { +impl Default for LockMap +where + PkType: std::hash::Hash + Ord, +{ fn default() -> Self { Self::new() } } -impl LockMap { +impl LockMap +where + PkType: std::hash::Hash + Ord, +{ pub fn new() -> Self { Self { set: Map::new(), @@ -26,16 +32,16 @@ impl LockMap { } } - pub fn insert(&self, id: LockId, lock: Arc) { - self.set.insert(id, lock); + pub fn insert(&self, key: PkType, lock: Arc) { + self.set.insert(key, Some(lock)); } - pub fn get(&self, id: &LockId) -> Option> { - self.set.get(id).map(|v| v.val().clone()) + pub fn get(&self, key: &PkType) -> Option> { + self.set.get(key).map(|v| v.val().clone())? } - pub fn remove(&self, id: &LockId) { - self.set.remove(id); + pub fn remove(&self, key: &PkType) { + self.set.remove(key); } pub fn next_id(&self) -> u16 { diff --git a/src/table/mod.rs b/src/table/mod.rs index 3075310a..a65f141d 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -25,10 +25,11 @@ pub struct WorkTable< PrimaryKey, AvailableTypes = (), SecondaryIndexes = (), + LockType = (), PkGen = ::Generator, const DATA_LENGTH: usize = INNER_PAGE_SIZE, > where - PrimaryKey: Clone + Ord + Send + 'static, + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, Row: StorableRow, { pub data: DataPages, @@ -39,7 +40,7 @@ pub struct WorkTable< pub pk_gen: PkGen, - pub lock_map: LockMap, + pub lock_map: LockMap, pub table_name: &'static str, @@ -49,10 +50,18 @@ pub struct WorkTable< } // Manual implementations to avoid unneeded trait bounds. -impl Default - for WorkTable +impl< + Row, + PrimaryKey, + AvailableTypes, + SecondaryIndexes, + LockType, + PkGen, + const DATA_LENGTH: usize, + > Default + for WorkTable where - PrimaryKey: Clone + Ord + Send + TablePrimaryKey, + PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, SecondaryIndexes: Default, PkGen: Default, Row: StorableRow, @@ -72,11 +81,18 @@ where } } -impl - WorkTable +impl< + Row, + PrimaryKey, + AvailableTypes, + SecondaryIndexes, + LockType, + PkGen, + const DATA_LENGTH: usize, + > WorkTable where Row: TableRow, - PrimaryKey: Clone + Ord + Send + TablePrimaryKey, + PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, Row: StorableRow, ::WrappedRow: RowWrapper, { @@ -94,6 +110,7 @@ where )] pub fn select(&self, pk: PrimaryKey) -> Option where + LockType: 'static, Row: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, @@ -123,6 +140,7 @@ where PrimaryKey: Clone, AvailableTypes: 'static, SecondaryIndexes: TableSecondaryIndex, + LockType: 'static, { let pk = row.get_primary_key().clone(); let link = self diff --git a/tests/worktable/custom_pk.rs b/tests/worktable/custom_pk.rs index 79ab5669..ade62fe6 100644 --- a/tests/worktable/custom_pk.rs +++ b/tests/worktable/custom_pk.rs @@ -17,6 +17,7 @@ use worktable::worktable; Ord, Serialize, SizeMeasure, + Hash, )] #[rkyv(compare(PartialEq), derive(Debug))] struct CustomId(u64);