From 8f8c720d49069f57b03164318e18984efae7a3a6 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Sun, 9 Mar 2025 17:36:12 +0300 Subject: [PATCH 1/5] WIP --- src/lib.rs | 1 - src/persistence/space/data.rs | 24 +++++++++++++----------- src/persistence/space/mod.rs | 2 +- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ea2b8262..5bd18db8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,7 +45,6 @@ pub mod prelude { pub use derive_more::{From, Into}; pub use lockfree::set::Set as LockFreeSet; - pub use scc::{ebr::Guard, tree_index::TreeIndex}; pub use worktable_codegen::{PersistIndex, PersistTable}; pub const WT_INDEX_EXTENSION: &str = ".wt.idx"; diff --git a/src/persistence/space/data.rs b/src/persistence/space/data.rs index a94480e0..1d6e6a35 100644 --- a/src/persistence/space/data.rs +++ b/src/persistence/space/data.rs @@ -1,7 +1,9 @@ -use std::fs::File; -use std::io::{Seek, SeekFrom, Write}; +use std::io::SeekFrom; use std::path::Path; +use crate::persistence::space::open_or_create_file; +use crate::persistence::SpaceDataOps; +use crate::prelude::WT_DATA_EXTENSION; use convert_case::{Case, Casing}; use data_bucket::{ parse_page, persist_page, update_at, DataPage, GeneralHeader, GeneralPage, Link, PageType, @@ -14,10 +16,8 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; - -use crate::persistence::space::open_or_create_file; -use crate::persistence::SpaceDataOps; -use crate::prelude::WT_DATA_EXTENSION; +use tokio::fs::File; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; #[derive(Debug)] pub struct SpaceData { @@ -28,13 +28,15 @@ pub struct SpaceData { } impl SpaceData { - fn update_data_length(&mut self) -> eyre::Result<()> { + async fn update_data_length(&mut self) -> eyre::Result<()> { let offset = (u32::default().aligned_size() * 6) as u32; - self.data_file.seek(SeekFrom::Start( - (self.last_page_id * (DATA_LENGTH + GENERAL_HEADER_SIZE as u32) + offset) as u64, - ))?; + self.data_file + .seek(SeekFrom::Start( + (self.last_page_id * (DATA_LENGTH + GENERAL_HEADER_SIZE as u32) + offset) as u64, + )) + .await?; let bytes = rkyv::to_bytes::(&self.current_data_length)?; - self.data_file.write_all(bytes.as_ref())?; + self.data_file.write_all(bytes.as_ref()).await?; Ok(()) } } diff --git a/src/persistence/space/mod.rs b/src/persistence/space/mod.rs index 54f5460b..ac54e655 100644 --- a/src/persistence/space/mod.rs +++ b/src/persistence/space/mod.rs @@ -1,12 +1,12 @@ mod data; mod index; -use std::fs::{File, OpenOptions}; use std::path::Path; use data_bucket::{GeneralPage, Link, SpaceInfoPage}; use indexset::cdc::change::ChangeEvent; use indexset::core::pair::Pair; +use tokio::fs::{File, OpenOptions}; pub use data::SpaceData; pub use index::{map_index_pages_to_toc_and_general, IndexTableOfContents, SpaceIndex}; From 6f0365be9b0c2e48a1ece0c6a607557c3c319cad Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 12 Mar 2025 19:29:05 +0300 Subject: [PATCH 2/5] WIP --- Cargo.toml | 5 +- codegen/src/persist_index/space.rs | 8 +- .../persist_table/generator/space_file/mod.rs | 3 +- src/persistence/engine.rs | 74 ++++---- src/persistence/mod.rs | 3 +- src/persistence/space/data.rs | 59 ++++--- src/persistence/space/index/mod.rs | 166 +++++++++++------- .../space/index/table_of_contents.rs | 16 +- src/persistence/space/mod.rs | 50 ++++-- src/persistence/task.rs | 2 +- tests/mod.rs | 16 +- tests/persistence/index_page/read.rs | 104 +++++++---- tests/persistence/read.rs | 51 +++--- .../space_index/indexset_compatibility.rs | 27 +-- tests/persistence/space_index/write.rs | 44 ++--- 15 files changed, 379 insertions(+), 249 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ef13f9f5..51c6c1b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,12 +23,11 @@ tracing = "0.1.40" rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = { version = "0.5.1" } worktable_codegen = { path = "codegen", version = "0.5.0" } -scc = "2.1.16" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4"] } -data_bucket = "0.2.0" +# data_bucket = "0.2.0" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } -# data_bucket = { path = "../DataBucket", version = "0.1.0" } +data_bucket = { path = "../DataBucket", version = "0.2.0" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] } diff --git a/codegen/src/persist_index/space.rs b/codegen/src/persist_index/space.rs index 1d636d2e..44b9b767 100644 --- a/codegen/src/persist_index/space.rs +++ b/codegen/src/persist_index/space.rs @@ -88,13 +88,13 @@ impl Generator { .map(|i| { let literal_name = Literal::string(i.to_string().as_str()); quote! { - #i: SpaceIndex::secondary_from_table_files_path(path, #literal_name)?, + #i: SpaceIndex::secondary_from_table_files_path(path, #literal_name).await?, } }) .collect(); quote! { - fn from_table_files_path>(path: S) -> eyre::Result { + async fn from_table_files_path>(path: S) -> eyre::Result { let path = path.as_ref(); Ok(Self { #(#fields)* @@ -113,14 +113,14 @@ impl Generator { .map(|i| { quote! { for event in events.#i { - self.#i.process_change_event(event)?; + self.#i.process_change_event(event).await?; } } }) .collect(); quote! { - fn process_change_events(&mut self, events: #events_ident) -> eyre::Result<()> { + async fn process_change_events(&mut self, events: #events_ident) -> eyre::Result<()> { #(#process)* core::result::Result::Ok(()) } diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index e7576ee8..bd717095 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -146,7 +146,7 @@ impl Generator { let dir_name = name_generator.get_dir_name(); quote! { - pub fn into_worktable(self, config: PersistenceConfig) -> #wt_ident { + pub async fn into_worktable(self, config: PersistenceConfig) -> #wt_ident { let mut page_id = 1; let data = self.data.into_iter().map(|p| { let mut data = Data::from_data_page(p); @@ -179,6 +179,7 @@ impl Generator { let path = format!("{}/{}", config.tables_path.as_str(), #dir_name); let engine: #engine_ident = PersistenceEngine::from_table_files_path(path) + .await .expect("should not panic as SpaceFile is ok"); #wt_ident( table, diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index 8e775cd4..60f73550 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -1,4 +1,5 @@ use std::fs; +use std::future::Future; use std::marker::PhantomData; use std::path::Path; @@ -50,16 +51,18 @@ where SpacePrimaryIndex: SpaceIndexOps, SpaceSecondaryIndexes: SpaceSecondaryIndexOps, { - pub fn from_table_files_path + Clone>(path: S) -> eyre::Result { + pub async fn from_table_files_path + Clone + Send>( + path: S, + ) -> eyre::Result { let table_path = Path::new(path.as_ref()); if !table_path.exists() { fs::create_dir_all(table_path)?; } Ok(Self { - data: SpaceData::from_table_files_path(path.clone())?, - primary_index: SpacePrimaryIndex::primary_from_table_files_path(path.clone())?, - secondary_indexes: SpaceSecondaryIndexes::from_table_files_path(path)?, + data: SpaceData::from_table_files_path(path.clone()).await?, + primary_index: SpacePrimaryIndex::primary_from_table_files_path(path.clone()).await?, + secondary_indexes: SpaceSecondaryIndexes::from_table_files_path(path).await?, phantom_data: PhantomData, }) } @@ -82,39 +85,50 @@ impl< PrimaryKeyGenState, > where - PrimaryKey: Ord + TablePrimaryKey, + PrimaryKey: Ord + TablePrimaryKey + Send, ::Generator: PrimaryKeyGeneratorState, - SpaceData: SpaceDataOps, - SpacePrimaryIndex: SpaceIndexOps, - SpaceSecondaryIndexes: SpaceSecondaryIndexOps, + SpaceData: SpaceDataOps + Send, + SpacePrimaryIndex: SpaceIndexOps + Send, + SpaceSecondaryIndexes: SpaceSecondaryIndexOps + Send, + SecondaryIndexEvents: Send, + PrimaryKeyGenState: Send, { fn apply_operation( &mut self, op: Operation, - ) -> eyre::Result<()> { - match op { - Operation::Insert(insert) => { - self.data.save_data(insert.link, insert.bytes.as_ref())?; - for event in insert.primary_key_events { - self.primary_index.process_change_event(event)?; + ) -> impl Future> + Send { + async { + match op { + Operation::Insert(insert) => { + self.data + .save_data(insert.link, insert.bytes.as_ref()) + .await?; + for event in insert.primary_key_events { + self.primary_index.process_change_event(event).await?; + } + let info = self.data.get_mut_info(); + info.inner.pk_gen_state = insert.pk_gen_state; + self.data.save_info().await?; + self.secondary_indexes + .process_change_events(insert.secondary_keys_events) + .await } - let info = self.data.get_mut_info(); - info.inner.pk_gen_state = insert.pk_gen_state; - self.data.save_info()?; - self.secondary_indexes - .process_change_events(insert.secondary_keys_events) - } - Operation::Update(update) => { - self.data.save_data(update.link, update.bytes.as_ref())?; - self.secondary_indexes - .process_change_events(update.secondary_keys_events) - } - Operation::Delete(delete) => { - for event in delete.primary_key_events { - self.primary_index.process_change_event(event)?; + Operation::Update(update) => { + self.data + .save_data(update.link, update.bytes.as_ref()) + .await?; + self.secondary_indexes + .process_change_events(update.secondary_keys_events) + .await + } + Operation::Delete(delete) => { + for event in delete.primary_key_events { + self.primary_index.process_change_event(event).await?; + } + self.secondary_indexes + .process_change_events(delete.secondary_keys_events) + .await } - self.secondary_indexes - .process_change_events(delete.secondary_keys_events) } } } diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index 2df59608..cbd9478a 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -11,11 +11,12 @@ pub use space::{ map_index_pages_to_toc_and_general, IndexTableOfContents, SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceSecondaryIndexOps, }; +use std::future::Future; pub use task::PersistenceTask; pub trait PersistenceEngineOps { fn apply_operation( &mut self, op: Operation, - ) -> eyre::Result<()>; + ) -> impl Future> + Send; } diff --git a/src/persistence/space/data.rs b/src/persistence/space/data.rs index 1d6e6a35..041fc8b1 100644 --- a/src/persistence/space/data.rs +++ b/src/persistence/space/data.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::io::SeekFrom; use std::path::Path; @@ -47,12 +48,14 @@ where PkGenState: Default + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - > + Archive, + > + Archive + + Send + + Sync, ::Archived: Deserialize>, SpaceInfoPage: Persistable, { - fn from_table_files_path>(table_path: S) -> eyre::Result { + async fn from_table_files_path + Send>(table_path: S) -> eyre::Result { let path = format!("{}/{}", table_path.as_ref(), WT_DATA_EXTENSION); let mut data_file = if !Path::new(&path).exists() { let name = table_path @@ -63,14 +66,14 @@ where .to_string() .from_case(Case::Snake) .to_case(Case::Pascal); - let mut data_file = open_or_create_file(path)?; - Self::bootstrap(&mut data_file, name)?; + let mut data_file = open_or_create_file(path).await?; + Self::bootstrap(&mut data_file, name).await?; data_file } else { - open_or_create_file(path)? + open_or_create_file(path).await? }; - let info = parse_page::<_, DATA_LENGTH>(&mut data_file, 0)?; - let file_length = data_file.metadata()?.len(); + let info = parse_page::<_, DATA_LENGTH>(&mut data_file, 0).await?; + let file_length = data_file.metadata().await?.len(); let page_id = file_length / (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64); Ok(Self { @@ -81,7 +84,7 @@ where }) } - fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()> { + async fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()> { let info = SpaceInfoPage { id: 0.into(), page_count: 0, @@ -96,32 +99,38 @@ where header: GeneralHeader::new(0.into(), PageType::SpaceInfo, 0.into()), inner: info, }; - persist_page(&mut page, file) + persist_page(&mut page, file).await } - fn save_data(&mut self, link: Link, bytes: &[u8]) -> eyre::Result<()> { - if link.page_id > self.last_page_id.into() { - let mut page = GeneralPage { - header: GeneralHeader::new(link.page_id, PageType::SpaceInfo, 0.into()), - inner: DataPage { - length: 0, - data: [0; 1], - }, - }; - persist_page(&mut page, &mut self.data_file)?; - self.current_data_length = 0; - self.last_page_id += 1; + fn save_data( + &mut self, + link: Link, + bytes: &[u8], + ) -> impl Future> + Send { + async move { + if link.page_id > self.last_page_id.into() { + let mut page = GeneralPage { + header: GeneralHeader::new(link.page_id, PageType::SpaceInfo, 0.into()), + inner: DataPage { + length: 0, + data: [0; 1], + }, + }; + persist_page(&mut page, &mut self.data_file).await?; + self.current_data_length = 0; + self.last_page_id += 1; + } + self.current_data_length += link.length; + self.update_data_length().await?; + update_at::<{ DATA_LENGTH }>(&mut self.data_file, link, bytes).await } - self.current_data_length += link.length; - self.update_data_length()?; - update_at::<{ DATA_LENGTH }>(&mut self.data_file, link, bytes) } fn get_mut_info(&mut self) -> &mut GeneralPage> { &mut self.info } - fn save_info(&mut self) -> eyre::Result<()> { + fn save_info(&mut self) -> impl Future> + Send { persist_page(&mut self.info, &mut self.data_file) } } diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index 6520bf16..4e788006 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -2,7 +2,7 @@ mod table_of_contents; mod util; use std::fmt::Debug; -use std::fs::File; +use std::future::Future; use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -24,6 +24,7 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{rancor, Archive, Deserialize, Serialize}; +use tokio::fs::File; use crate::persistence::space::open_or_create_file; use crate::persistence::SpaceIndexOps; @@ -51,10 +52,12 @@ where + Default + Debug + SizeMeasurable - + for<'a> Serialize, Share>, rancor::Error>>, + + for<'a> Serialize, Share>, rancor::Error>> + + Send + + Sync, ::Archived: Deserialize> + Ord + Eq, { - pub fn new>(index_file_path: S, space_id: SpaceId) -> eyre::Result { + pub async fn new>(index_file_path: S, space_id: SpaceId) -> eyre::Result { let mut index_file = if !Path::new(index_file_path.as_ref()).exists() { let name = index_file_path .as_ref() @@ -67,19 +70,20 @@ where .to_string() .from_case(Case::Snake) .to_case(Case::Pascal); - let mut index_file = open_or_create_file(index_file_path.as_ref())?; - Self::bootstrap(&mut index_file, name)?; + let mut index_file = open_or_create_file(index_file_path.as_ref()).await?; + Self::bootstrap(&mut index_file, name).await?; index_file } else { - open_or_create_file(index_file_path)? + open_or_create_file(index_file_path).await? }; - let info = parse_page::<_, DATA_LENGTH>(&mut index_file, 0)?; + let info = parse_page::<_, DATA_LENGTH>(&mut index_file, 0).await?; - let file_length = index_file.metadata()?.len(); + let file_length = index_file.metadata().await?.len(); let page_id = file_length / (DATA_LENGTH as u64 + GENERAL_HEADER_SIZE as u64) + 1; let next_page_id = Arc::new(AtomicU32::new(page_id as u32)); let table_of_contents = - IndexTableOfContents::parse_from_file(&mut index_file, space_id, next_page_id.clone())?; + IndexTableOfContents::parse_from_file(&mut index_file, space_id, next_page_id.clone()) + .await?; Ok(Self { space_id, table_of_contents, @@ -89,7 +93,11 @@ where }) } - fn add_new_index_page(&mut self, node_id: Pair, page_id: PageId) -> eyre::Result<()> + async fn add_new_index_page( + &mut self, + node_id: Pair, + page_id: PageId, + ) -> eyre::Result<()> where T: Archive + Clone @@ -109,10 +117,10 @@ where key: node_id.key, link: node_id.value, }; - self.add_index_page(page, page_id) + self.add_index_page(page, page_id).await } - fn add_index_page(&mut self, node: IndexPage, page_id: PageId) -> eyre::Result<()> + async fn add_index_page(&mut self, node: IndexPage, page_id: PageId) -> eyre::Result<()> where T: Archive + Clone @@ -128,11 +136,11 @@ where inner: node, header, }; - persist_page(&mut general_page, &mut self.index_file)?; + persist_page(&mut general_page, &mut self.index_file).await?; Ok(()) } - fn insert_on_index_page( + async fn insert_on_index_page( &mut self, page_id: PageId, node_id: T, @@ -154,7 +162,8 @@ where let mut new_node_id = None; let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); - let mut utility = IndexPage::::parse_index_page_utility(&mut self.index_file, page_id)?; + let mut utility = + IndexPage::::parse_index_page_utility(&mut self.index_file, page_id).await?; utility.slots.insert(index, utility.current_index); utility.slots.remove(size); utility.current_length += 1; @@ -168,19 +177,20 @@ where size, index_value, utility.current_index, - )?; + ) + .await?; if node_id < value.key { utility.node_id = value.key.clone(); new_node_id = Some(value.key); } - IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility)?; + IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility).await?; Ok(new_node_id) } - fn remove_from_index_page( + async fn remove_from_index_page( &mut self, page_id: PageId, node_id: T, @@ -202,7 +212,8 @@ where let mut new_node_id = None; let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); - let mut utility = IndexPage::::parse_index_page_utility(&mut self.index_file, page_id)?; + let mut utility = + IndexPage::::parse_index_page_utility(&mut self.index_file, page_id).await?; utility.current_index = *utility .slots .get(index) @@ -210,7 +221,8 @@ where utility.slots.remove(index); utility.slots.push(0); utility.current_length -= 1; - IndexPage::::remove_value(&mut self.index_file, page_id, size, utility.current_index)?; + IndexPage::::remove_value(&mut self.index_file, page_id, size, utility.current_index) + .await?; if node_id == value.key { let index = *utility @@ -222,17 +234,18 @@ where page_id, size, index as usize, - )? + ) + .await? .key; new_node_id = Some(utility.node_id.clone()) } - IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility)?; + IndexPage::::persist_index_page_utility(&mut self.index_file, page_id, utility).await?; Ok(new_node_id) } - fn process_insert_at( + async fn process_insert_at( &mut self, node_id: T, value: Pair, @@ -252,15 +265,17 @@ where .table_of_contents .get(&node_id) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; - if let Some(new_node_id) = - self.insert_on_index_page(page_id, node_id.clone(), index, value)? + if let Some(new_node_id) = self + .insert_on_index_page(page_id, node_id.clone(), index, value) + .await? { self.table_of_contents.update_key(&node_id, new_node_id); - self.table_of_contents.persist(&mut self.index_file)?; + self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) } - fn process_remove_at( + + async fn process_remove_at( &mut self, node_id: T, value: Pair, @@ -280,15 +295,16 @@ where .table_of_contents .get(&node_id) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; - if let Some(new_node_id) = - self.remove_from_index_page(page_id, node_id.clone(), index, value)? + if let Some(new_node_id) = self + .remove_from_index_page(page_id, node_id.clone(), index, value) + .await? { self.table_of_contents.update_key(&node_id, new_node_id); - self.table_of_contents.persist(&mut self.index_file)?; + self.table_of_contents.persist(&mut self.index_file).await?; } Ok(()) } - fn process_create_node(&mut self, node_id: Pair) -> eyre::Result<()> + async fn process_create_node(&mut self, node_id: Pair) -> eyre::Result<()> where T: Archive + Clone @@ -305,13 +321,13 @@ where self.next_page_id.fetch_add(1, Ordering::Relaxed).into() }; self.table_of_contents.insert(node_id.key.clone(), page_id); - self.table_of_contents.persist(&mut self.index_file)?; - self.add_new_index_page(node_id, page_id)?; + self.table_of_contents.persist(&mut self.index_file).await?; + self.add_new_index_page(node_id, page_id).await?; Ok(()) } - fn process_remove_node(&mut self, node_id: T) -> eyre::Result<()> + async fn process_remove_node(&mut self, node_id: T) -> eyre::Result<()> where T: Archive + Clone @@ -323,11 +339,11 @@ where >, { self.table_of_contents.remove(&node_id); - self.table_of_contents.persist(&mut self.index_file)?; + self.table_of_contents.persist(&mut self.index_file).await?; Ok(()) } - fn process_split_node(&mut self, node_id: T, split_index: usize) -> eyre::Result<()> + async fn process_split_node(&mut self, node_id: T, split_index: usize) -> eyre::Result<()> where T: Archive + Clone @@ -346,7 +362,7 @@ where .get(&node_id) .ok_or(eyre!("Node with {:?} id is not found", node_id))?; let mut page = - parse_page::, DATA_LENGTH>(&mut self.index_file, page_id.into())?; + parse_page::, DATA_LENGTH>(&mut self.index_file, page_id.into()).await?; let splitted_page = page.inner.split(split_index); let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { id @@ -358,15 +374,15 @@ where .update_key(&node_id, page.inner.node_id.clone()); self.table_of_contents .insert(splitted_page.node_id.clone(), new_page_id); - self.table_of_contents.persist(&mut self.index_file)?; + self.table_of_contents.persist(&mut self.index_file).await?; - self.add_index_page(splitted_page, new_page_id)?; - persist_page(&mut page, &mut self.index_file)?; + self.add_index_page(splitted_page, new_page_id).await?; + persist_page(&mut page, &mut self.index_file).await?; Ok(()) } - pub fn parse_indexset(&mut self) -> eyre::Result> + pub async fn parse_indexset(&mut self) -> eyre::Result> where T: Archive + Clone @@ -378,14 +394,15 @@ where + Send + for<'a> Serialize< Strategy, Share>, rancor::Error>, - >, + > + 'static, ::Archived: Deserialize>, { let size = get_index_page_size_from_data_length::(DATA_LENGTH as usize); let indexset = BTreeMap::with_maximum_node_size(size); for (_, page_id) in self.table_of_contents.iter() { let page = - parse_page::, DATA_LENGTH>(&mut self.index_file, (*page_id).into())?; + parse_page::, DATA_LENGTH>(&mut self.index_file, (*page_id).into()) + .await?; let node = page.inner.get_node(); indexset.attach_node(node) } @@ -403,15 +420,19 @@ where + Default + Debug + SizeMeasurable - + for<'a> Serialize, Share>, rancor::Error>>, + + for<'a> Serialize, Share>, rancor::Error>> + + Send + + Sync, ::Archived: Deserialize> + Ord + Eq, { - fn primary_from_table_files_path>(table_path: S) -> eyre::Result { + async fn primary_from_table_files_path + Send>( + table_path: S, + ) -> eyre::Result { let path = format!("{}/primary{}", table_path.as_ref(), WT_INDEX_EXTENSION); - Self::new(path, 0.into()) + Self::new(path, 0.into()).await } - fn secondary_from_table_files_path, S2: AsRef>( + async fn secondary_from_table_files_path + Send, S2: AsRef + Send>( table_path: S1, name: S2, ) -> eyre::Result @@ -424,10 +445,10 @@ where name.as_ref(), WT_INDEX_EXTENSION ); - Self::new(path, 0.into()) + Self::new(path, 0.into()).await } - fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()> { + async fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()> { let info = SpaceInfoPage { id: 0.into(), page_count: 0, @@ -442,27 +463,36 @@ where header: GeneralHeader::new(0.into(), PageType::SpaceInfo, 0.into()), inner: info, }; - persist_page(&mut page, file) + persist_page(&mut page, file).await } - fn process_change_event(&mut self, event: ChangeEvent>) -> eyre::Result<()> { - match event { - ChangeEvent::InsertAt { - max_value: node_id, - value, - index, - } => self.process_insert_at(node_id.key, value, index), - ChangeEvent::RemoveAt { - max_value: node_id, - value, - index, - } => self.process_remove_at(node_id.key, value, index), - ChangeEvent::CreateNode { max_value: node_id } => self.process_create_node(node_id), - ChangeEvent::RemoveNode { max_value: node_id } => self.process_remove_node(node_id.key), - ChangeEvent::SplitNode { - max_value: node_id, - split_index, - } => self.process_split_node(node_id.key, split_index), + fn process_change_event( + &mut self, + event: ChangeEvent>, + ) -> impl Future> + Send { + async { + match event { + ChangeEvent::InsertAt { + max_value: node_id, + value, + index, + } => self.process_insert_at(node_id.key, value, index).await, + ChangeEvent::RemoveAt { + max_value: node_id, + value, + index, + } => self.process_remove_at(node_id.key, value, index).await, + ChangeEvent::CreateNode { max_value: node_id } => { + self.process_create_node(node_id).await + } + ChangeEvent::RemoveNode { max_value: node_id } => { + self.process_remove_node(node_id.key).await + } + ChangeEvent::SplitNode { + max_value: node_id, + split_index, + } => self.process_split_node(node_id.key, split_index).await, + } } } } diff --git a/src/persistence/space/index/table_of_contents.rs b/src/persistence/space/index/table_of_contents.rs index 227de3cc..47c50415 100644 --- a/src/persistence/space/index/table_of_contents.rs +++ b/src/persistence/space/index/table_of_contents.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -14,6 +13,7 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{rancor, Archive, Deserialize, Serialize}; +use tokio::fs::File; #[derive(Debug)] pub struct IndexTableOfContents { @@ -130,7 +130,7 @@ where page.inner.pop_empty_page() } - pub fn persist(&mut self, file: &mut File) -> eyre::Result<()> + pub async fn persist(&mut self, file: &mut File) -> eyre::Result<()> where T: Archive + Ord @@ -139,17 +139,18 @@ where + SizeMeasurable + for<'a> Serialize< Strategy, Share>, rancor::Error>, - >, + > + Send + + Sync, ::Archived: Deserialize> + Ord + Eq, { for page in &mut self.pages { - persist_page(page, file)?; + persist_page(page, file).await?; } Ok(()) } - pub fn parse_from_file( + pub async fn parse_from_file( file: &mut File, space_id: SpaceId, next_page_id: Arc, @@ -165,7 +166,7 @@ where >, ::Archived: Deserialize> + Ord + Eq, { - let first_page = parse_page::, DATA_LENGTH>(file, 1); + let first_page = parse_page::, DATA_LENGTH>(file, 1).await; if let Ok(page) = first_page { if page.header.next_id.is_empty() { Ok(Self { @@ -179,7 +180,8 @@ where let mut ind = false; while !ind { - let page = parse_page::, DATA_LENGTH>(file, index)?; + let page = + parse_page::, DATA_LENGTH>(file, index).await?; ind = page.header.next_id.is_empty(); table_of_contents_pages.push(page); index += 1; diff --git a/src/persistence/space/mod.rs b/src/persistence/space/mod.rs index ac54e655..87e0c42d 100644 --- a/src/persistence/space/mod.rs +++ b/src/persistence/space/mod.rs @@ -1,6 +1,7 @@ mod data; mod index; +use std::future::Future; use std::path::Path; use data_bucket::{GeneralPage, Link, SpaceInfoPage}; @@ -12,44 +13,67 @@ pub use data::SpaceData; pub use index::{map_index_pages_to_toc_and_general, IndexTableOfContents, SpaceIndex}; pub trait SpaceDataOps { - fn from_table_files_path>(path: S) -> eyre::Result + fn from_table_files_path + Send>( + path: S, + ) -> impl Future> + Send where Self: Sized; - fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()>; - fn save_data(&mut self, link: Link, bytes: &[u8]) -> eyre::Result<()>; + fn bootstrap( + file: &mut File, + table_name: String, + ) -> impl Future> + Send; + fn save_data( + &mut self, + link: Link, + bytes: &[u8], + ) -> impl Future> + Send; fn get_mut_info(&mut self) -> &mut GeneralPage>; - fn save_info(&mut self) -> eyre::Result<()>; + fn save_info(&mut self) -> impl Future> + Send; } pub trait SpaceIndexOps where T: Ord, { - fn primary_from_table_files_path>(path: S) -> eyre::Result + fn primary_from_table_files_path + Send>( + path: S, + ) -> impl Future> + Send where Self: Sized; - fn secondary_from_table_files_path, S2: AsRef>( + fn secondary_from_table_files_path + Send, S2: AsRef + Send>( path: S1, name: S2, - ) -> eyre::Result + ) -> impl Future> + Send where Self: Sized; - fn bootstrap(file: &mut File, table_name: String) -> eyre::Result<()>; - fn process_change_event(&mut self, event: ChangeEvent>) -> eyre::Result<()>; + fn bootstrap( + file: &mut File, + table_name: String, + ) -> impl Future> + Send; + fn process_change_event( + &mut self, + event: ChangeEvent>, + ) -> impl Future> + Send; } pub trait SpaceSecondaryIndexOps { - fn from_table_files_path>(path: S) -> eyre::Result + fn from_table_files_path + Send>( + path: S, + ) -> impl Future> + Send where Self: Sized; - fn process_change_events(&mut self, events: SecondaryIndexEvents) -> eyre::Result<()>; + fn process_change_events( + &mut self, + events: SecondaryIndexEvents, + ) -> impl Future> + Send; } -pub fn open_or_create_file>(path: S) -> eyre::Result { +pub async fn open_or_create_file>(path: S) -> eyre::Result { let path = Path::new(path.as_ref()); Ok(OpenOptions::new() .write(true) .read(true) .create(!path.exists()) - .open(path)?) + .open(path) + .await?) } diff --git a/src/persistence/task.rs b/src/persistence/task.rs index de9ca3e7..9329f480 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -81,7 +81,7 @@ impl engine_progress_notify.notify_waiters(); engine_queue.pop().await }; - let res = engine.apply_operation(next_op); + let res = engine.apply_operation(next_op).await; if let Err(err) = res { tracing::warn!("{}", err); } diff --git a/tests/mod.rs b/tests/mod.rs index 9718656a..769d89a4 100644 --- a/tests/mod.rs +++ b/tests/mod.rs @@ -1,12 +1,12 @@ -use std::fs; -use std::fs::File; -use std::io::{BufReader, Read}; +use std::io::Read; use std::path::Path; +use tokio::fs::File; +use tokio::io::BufReader; mod persistence; mod worktable; -pub fn check_if_files_are_same(got: String, expected: String) -> bool { +pub async fn check_if_files_are_same(got: String, expected: String) -> bool { let got = File::open(got).unwrap(); let expected = File::open(expected).unwrap(); @@ -44,14 +44,14 @@ pub fn check_if_dirs_are_same(got: String, expected: String) -> bool { true } -pub fn remove_file_if_exists(path: String) { +pub async fn remove_file_if_exists(path: String) { if Path::new(path.as_str()).exists() { - fs::remove_file(path.as_str()).unwrap(); + tokio::fs::remove_file(path.as_str()).await.unwrap(); } } -pub fn remove_dir_if_exists(path: String) { +pub async fn remove_dir_if_exists(path: String) { if Path::new(path.as_str()).exists() { - fs::remove_dir_all(path).unwrap() + tokio::fs::remove_dir_all(path).await.unwrap() } } diff --git a/tests/persistence/index_page/read.rs b/tests/persistence/index_page/read.rs index 79fdba64..3b39c55e 100644 --- a/tests/persistence/index_page/read.rs +++ b/tests/persistence/index_page/read.rs @@ -1,29 +1,35 @@ use data_bucket::{parse_page, IndexPage, INNER_PAGE_SIZE}; -use std::fs::OpenOptions; +use tokio::fs::OpenOptions; -#[test] -fn test_index_page_read_in_space() { +#[tokio::test] +async fn test_index_page_read_in_space() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/test_persist/primary.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 99); assert_eq!(page.inner.current_index, 99); assert_eq!(page.inner.current_length, 99); } -#[test] -fn test_index_page_read_after_create_node_in_space_index() { +#[tokio::test] +async fn test_index_page_read_after_create_node_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_create_node.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 5); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); @@ -32,15 +38,18 @@ fn test_index_page_read_after_create_node_in_space_index() { assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); } -#[test] -fn test_index_page_read_after_insert_at_in_space_index() { +#[tokio::test] +async fn test_index_page_read_after_insert_at_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 5); assert_eq!(page.inner.current_index, 2); assert_eq!(page.inner.current_length, 2); @@ -53,15 +62,18 @@ fn test_index_page_read_after_insert_at_in_space_index() { assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); } -#[test] -fn test_index_page_read_after_insert_at_with_node_id_update_in_space_index() { +#[tokio::test] +async fn test_index_page_read_after_insert_at_with_node_id_update_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 7); assert_eq!(page.inner.current_index, 2); assert_eq!(page.inner.current_length, 2); @@ -74,15 +86,18 @@ fn test_index_page_read_after_insert_at_with_node_id_update_in_space_index() { assert_eq!(page.inner.index_values.get(1).unwrap().link.offset, 24); } -#[test] -fn test_index_page_read_after_remove_at_node_id_in_space_index() { +#[tokio::test] +async fn test_index_page_read_after_remove_at_node_id_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_remove_at_node_id.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 3); assert_eq!(page.inner.current_index, 0); assert_eq!(page.inner.current_length, 1); @@ -92,15 +107,18 @@ fn test_index_page_read_after_remove_at_node_id_in_space_index() { assert_eq!(page.inner.index_values.get(1).unwrap().link.offset, 24); } -#[test] -fn test_index_page_read_after_insert_at_removed_place_in_space_index() { +#[tokio::test] +async fn test_index_page_read_after_insert_at_removed_place_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at_removed_place.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 7); assert_eq!(page.inner.current_index, 3); assert_eq!(page.inner.current_length, 3); @@ -118,15 +136,18 @@ fn test_index_page_read_after_insert_at_removed_place_in_space_index() { assert_eq!(page.inner.index_values.get(2).unwrap().link.offset, 72); } -#[test] -fn test_index_pages_read_after_creation_of_second_node_in_space_index() { +#[tokio::test] +async fn test_index_pages_read_after_creation_of_second_node_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_create_second_node.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 5); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); @@ -134,7 +155,9 @@ fn test_index_pages_read_after_creation_of_second_node_in_space_index() { assert_eq!(page.inner.index_values.first().unwrap().key, 5); assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) + .await + .unwrap(); assert_eq!(page.inner.node_id, 15); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); @@ -143,15 +166,18 @@ fn test_index_pages_read_after_creation_of_second_node_in_space_index() { assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); } -#[test] -fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space_index() { +#[tokio::test] +async fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_create_node_after_remove.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 10); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); @@ -159,7 +185,9 @@ fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space_index assert_eq!(page.inner.index_values.first().unwrap().key, 10); assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) + .await + .unwrap(); assert_eq!(page.inner.node_id, 15); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 1); @@ -168,35 +196,43 @@ fn test_index_pages_read_after_creation_of_node_after_remove_node_in_space_index assert_eq!(page.inner.index_values.first().unwrap().link.length, 24); } -#[test] -fn test_index_pages_read_full_page() { +#[tokio::test] +async fn test_index_pages_read_full_page() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at_big_amount.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 1000); assert_eq!(page.inner.current_index, 907); assert_eq!(page.inner.current_length, 907); assert_eq!(page.inner.size, page.inner.current_index); } -#[test] -fn test_index_pages_read_after_node_split() { +#[tokio::test] +async fn test_index_pages_read_after_node_split() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_split_node.wt.idx") + .await .unwrap(); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(page.inner.node_id, 457); assert_eq!(page.inner.current_index, 1); assert_eq!(page.inner.current_length, 453); - let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3).unwrap(); + let page = parse_page::, { INNER_PAGE_SIZE as u32 }>(&mut file, 3) + .await + .unwrap(); assert_eq!(page.inner.node_id, 1000); assert_eq!(page.inner.current_index, 454); assert_eq!(page.inner.current_length, 454); diff --git a/tests/persistence/read.rs b/tests/persistence/read.rs index 9c4e0d7f..b0c76feb 100644 --- a/tests/persistence/read.rs +++ b/tests/persistence/read.rs @@ -1,5 +1,4 @@ -use std::fs::File; - +use tokio::fs::File; use worktable::prelude::*; // TODO: Fix naming. @@ -9,11 +8,14 @@ use crate::persistence::{ }; use crate::remove_dir_if_exists; -#[test] -fn test_info_parse() { - let mut file = File::open("tests/data/expected/test_persist/.wt.data").unwrap(); - let info = - parse_page::, { TEST_PERSIST_INNER_SIZE as u32 }>(&mut file, 0).unwrap(); +#[tokio::test] +async fn test_info_parse() { + let mut file = File::open("tests/data/expected/test_persist/.wt.data") + .await + .unwrap(); + let info = parse_page::, { TEST_PERSIST_INNER_SIZE as u32 }>(&mut file, 0) + .await + .unwrap(); assert_eq!(info.header.space_id, 0.into()); assert_eq!(info.header.page_id, 0.into()); @@ -29,11 +31,14 @@ fn test_info_parse() { assert_eq!(info.inner.empty_links_list, vec![]); } -#[test] -fn test_primary_index_parse() { - let mut file = File::open("tests/data/expected/test_persist/primary.wt.idx").unwrap(); - let index = - parse_page::, { TEST_PERSIST_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); +#[tokio::test] +async fn test_primary_index_parse() { + let mut file = File::open("tests/data/expected/test_persist/primary.wt.idx") + .await + .unwrap(); + let index = parse_page::, { TEST_PERSIST_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(index.header.space_id, 0.into()); assert_eq!(index.header.page_id, 2.into()); @@ -63,11 +68,14 @@ fn test_primary_index_parse() { } } -#[test] -fn test_another_idx_index_parse() { - let mut file = File::open("tests/data/expected/test_persist/another_idx.wt.idx").unwrap(); - let index = - parse_page::, { TEST_PERSIST_PAGE_SIZE as u32 }>(&mut file, 2).unwrap(); +#[tokio::test] +async fn test_another_idx_index_parse() { + let mut file = File::open("tests/data/expected/test_persist/another_idx.wt.idx") + .await + .unwrap(); + let index = parse_page::, { TEST_PERSIST_PAGE_SIZE as u32 }>(&mut file, 2) + .await + .unwrap(); assert_eq!(index.header.space_id, 0.into()); assert_eq!(index.header.page_id, 2.into()); @@ -97,11 +105,14 @@ fn test_another_idx_index_parse() { } } -#[test] -fn test_data_parse() { - let mut file = File::open("tests/data/expected/test_persist/.wt.data").unwrap(); +#[tokio::test] +async fn test_data_parse() { + let mut file = File::open("tests/data/expected/test_persist/.wt.data") + .await + .unwrap(); let data = parse_data_page::<{ TEST_PERSIST_PAGE_SIZE }, { TEST_PERSIST_INNER_SIZE }>(&mut file, 1) + .await .unwrap(); assert_eq!(data.header.space_id, 0.into()); diff --git a/tests/persistence/space_index/indexset_compatibility.rs b/tests/persistence/space_index/indexset_compatibility.rs index 385ead84..2adfb280 100644 --- a/tests/persistence/space_index/indexset_compatibility.rs +++ b/tests/persistence/space_index/indexset_compatibility.rs @@ -6,14 +6,15 @@ use worktable::prelude::{SpaceIndex, SpaceIndexOps}; use crate::{check_if_files_are_same, remove_file_if_exists}; -#[test] -fn test_indexset_node_creation() { +#[tokio::test] +async fn test_indexset_node_creation() { remove_file_if_exists("tests/data/space_index/indexset/process_create_node.wt.idx".to_string()); let mut space_index = SpaceIndex::::new( "tests/data/space_index/indexset/process_create_node.wt.idx", 0.into(), ) + .await .unwrap(); let indexset = BTreeMap::::new(); let (_, cdc) = indexset.insert_cdc( @@ -25,7 +26,7 @@ fn test_indexset_node_creation() { }, ); for event in cdc { - space_index.process_change_event(event).unwrap(); + space_index.process_change_event(event).await.unwrap(); } assert!(check_if_files_are_same( @@ -34,8 +35,8 @@ fn test_indexset_node_creation() { )) } -#[test] -fn test_space_index_process_insert_at() { +#[tokio::test] +async fn test_space_index_process_insert_at() { remove_file_if_exists("tests/data/space_index/indexset/process_insert_at.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_create_node.wt.idx", @@ -47,8 +48,9 @@ fn test_space_index_process_insert_at() { "tests/data/space_index/indexset/process_insert_at.wt.idx", 0.into(), ) + .await .unwrap(); - let indexset = space_index.parse_indexset().unwrap(); + let indexset = space_index.parse_indexset().await.unwrap(); let (_, cdc) = indexset.insert_cdc( 3, Link { @@ -58,7 +60,7 @@ fn test_space_index_process_insert_at() { }, ); for event in cdc { - space_index.process_change_event(event).unwrap(); + space_index.process_change_event(event).await.unwrap(); } assert!(check_if_files_are_same( @@ -67,8 +69,8 @@ fn test_space_index_process_insert_at() { )) } -#[test] -fn test_space_index_process_insert_at_big_amount() { +#[tokio::test] +async fn test_space_index_process_insert_at_big_amount() { remove_file_if_exists( "tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx".to_string(), ); @@ -82,8 +84,9 @@ fn test_space_index_process_insert_at_big_amount() { "tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx", 0.into(), ) + .await .unwrap(); - let indexset = space_index.parse_indexset().unwrap(); + let indexset = space_index.parse_indexset().await.unwrap(); let (_, cdc) = indexset.insert_cdc( 1000, @@ -94,7 +97,7 @@ fn test_space_index_process_insert_at_big_amount() { }, ); for event in cdc { - space_index.process_change_event(event).unwrap(); + space_index.process_change_event(event).await.unwrap(); } for i in (6..911).rev() { @@ -107,7 +110,7 @@ fn test_space_index_process_insert_at_big_amount() { }, ); for event in cdc { - space_index.process_change_event(event).unwrap(); + space_index.process_change_event(event).await.unwrap(); } } diff --git a/tests/persistence/space_index/write.rs b/tests/persistence/space_index/write.rs index 0eb3327f..5e1f04b6 100644 --- a/tests/persistence/space_index/write.rs +++ b/tests/persistence/space_index/write.rs @@ -10,8 +10,8 @@ use crate::{check_if_files_are_same, remove_file_if_exists}; mod run_first { use super::*; - #[test] - fn test_space_index_process_create_node() { + #[tokio::test] + async fn test_space_index_process_create_node() { remove_file_if_exists("tests/data/space_index/process_create_node.wt.idx".to_string()); let mut space_index = SpaceIndex::::new( @@ -39,8 +39,8 @@ mod run_first { )) } - #[test] - fn test_space_index_process_create_second_node() { + #[tokio::test] + async fn test_space_index_process_create_second_node() { remove_file_if_exists( "tests/data/space_index/process_create_second_node.wt.idx".to_string(), ); @@ -75,8 +75,8 @@ mod run_first { )) } - #[test] - fn test_space_index_process_insert_at() { + #[tokio::test] + async fn test_space_index_process_insert_at() { remove_file_if_exists("tests/data/space_index/process_insert_at.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_create_node.wt.idx", @@ -118,8 +118,8 @@ mod run_first { )) } - #[test] - fn test_space_index_process_insert_at_big_amount() { + #[tokio::test] + async fn test_space_index_process_insert_at_big_amount() { remove_file_if_exists( "tests/data/space_index/process_insert_at_big_amount.wt.idx".to_string(), ); @@ -187,8 +187,8 @@ mod run_first { )) } - #[test] - fn test_space_index_process_remove_node() { + #[tokio::test] + async fn test_space_index_process_remove_node() { remove_file_if_exists("tests/data/space_index/process_remove_node.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_create_second_node.wt.idx", @@ -222,8 +222,8 @@ mod run_first { } } -#[test] -fn test_space_index_process_insert_at_with_node_id_update() { +#[tokio::test] +async fn test_space_index_process_insert_at_with_node_id_update() { remove_file_if_exists( "tests/data/space_index/process_insert_at_with_node_id_update.wt.idx".to_string(), ); @@ -267,8 +267,8 @@ fn test_space_index_process_insert_at_with_node_id_update() { )) } -#[test] -fn test_space_index_process_remove_at() { +#[tokio::test] +async fn test_space_index_process_remove_at() { remove_file_if_exists("tests/data/space_index/process_remove_at.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_insert_at.wt.idx", @@ -310,8 +310,8 @@ fn test_space_index_process_remove_at() { )) } -#[test] -fn test_space_index_process_remove_at_node_id() { +#[tokio::test] +async fn test_space_index_process_remove_at_node_id() { remove_file_if_exists("tests/data/space_index/process_remove_at_node_id.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_insert_at.wt.idx", @@ -353,8 +353,8 @@ fn test_space_index_process_remove_at_node_id() { )) } -#[test] -fn test_space_index_process_insert_at_removed_place() { +#[tokio::test] +async fn test_space_index_process_insert_at_removed_place() { remove_file_if_exists( "tests/data/space_index/process_insert_at_removed_place.wt.idx".to_string(), ); @@ -440,8 +440,8 @@ fn test_space_index_process_insert_at_removed_place() { )) } -#[test] -fn test_space_index_process_create_node_after_remove() { +#[tokio::test] +async fn test_space_index_process_create_node_after_remove() { remove_file_if_exists( "tests/data/space_index/process_create_node_after_remove.wt.idx".to_string(), ); @@ -476,8 +476,8 @@ fn test_space_index_process_create_node_after_remove() { )) } -#[test] -fn test_space_index_process_split_node() { +#[tokio::test] +async fn test_space_index_process_split_node() { remove_file_if_exists("tests/data/space_index/process_split_node.wt.idx".to_string()); copy( "tests/data/expected/space_index/process_insert_at_big_amount.wt.idx", From 8ef1f929e25c10dc3f457104479b2ffcf96c0aaf Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:28:14 +0300 Subject: [PATCH 3/5] Test fixes --- codegen/src/persist_index/generator.rs | 24 ++++---- .../persist_table/generator/space_file/mod.rs | 42 ++++++------- .../generator/space_file/worktable_impls.rs | 12 ++-- .../src/worktable/generator/table/impls.rs | 4 +- tests/mod.rs | 12 ++-- tests/persistence/mod.rs | 14 +++-- tests/persistence/read.rs | 54 +++-------------- .../space_index/indexset_compatibility.rs | 9 ++- tests/persistence/space_index/write.rs | 55 +++++++++++++---- tests/persistence/sync/mod.rs | 60 +++++++++++-------- tests/persistence/toc/read.rs | 56 +++++++++++------ tests/persistence/toc/write.rs | 14 +++-- tests/persistence/write.rs | 16 ++--- 13 files changed, 200 insertions(+), 172 deletions(-) diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index 8177a361..f27307e1 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -143,15 +143,15 @@ impl Generator { let index_name_literal = Literal::string(i.to_string().as_str()); quote! { { - let mut file = std::fs::File::create(format!("{}/{}{}", path, #index_name_literal, #index_extension))?; + let mut file = tokio::fs::File::create(format!("{}/{}{}", path, #index_name_literal, #index_extension)).await?; let mut info = #ident::space_info_default(); - info.inner.page_count = self.#i.1.len() as u32 + self.#i.0.len() as u32;; - persist_page(&mut info, &mut file)?; + info.inner.page_count = self.#i.1.len() as u32 + self.#i.0.len() as u32; + persist_page(&mut info, &mut file).await?; for mut page in &mut self.#i.0 { - persist_page(&mut page, &mut file)?; + persist_page(&mut page, &mut file).await?; } for mut page in &mut self.#i.1 { - persist_page(&mut page, &mut file)?; + persist_page(&mut page, &mut file).await?; } } } @@ -159,7 +159,7 @@ impl Generator { .collect::>(); quote! { - pub fn persist(&mut self, path: &str) -> eyre::Result<()> + pub async fn persist(&mut self, path: &str) -> eyre::Result<()> { #(#persist_logic)* Ok(()) @@ -193,14 +193,14 @@ impl Generator { .map(|(l, i)| quote! { let #i = { let mut #i = vec![]; - let mut file = std::fs::File::open(format!("{}/{}{}", path, #l, #index_extension))?; - let info = parse_page::, { #page_const_name as u32 }>(&mut file, 0)?; - let file_length = file.metadata()?.len(); + let mut file = tokio::fs::File::open(format!("{}/{}{}", path, #l, #index_extension)).await?; + let info = parse_page::, { #page_const_name as u32 }>(&mut file, 0).await?; + let file_length = file.metadata().await?.len(); let page_id = file_length / (#page_const_name as u64 + GENERAL_HEADER_SIZE as u64) + 1; let next_page_id = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(page_id as u32)); - let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut file, 0.into(), next_page_id.clone())?; + let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut file, 0.into(), next_page_id.clone()).await?; for page_id in toc.iter().map(|(_, page_id)| page_id) { - let index = parse_page::, { #page_const_name as u32 }>(&mut file, (*page_id).into())?; + let index = parse_page::, { #page_const_name as u32 }>(&mut file, (*page_id).into()).await?; #i.push(index); } (toc.pages, #i) @@ -220,7 +220,7 @@ impl Generator { .collect::>(); quote! { - pub fn parse_from_file(path: &str) -> eyre::Result { + pub async fn parse_from_file(path: &str) -> eyre::Result { #(#field_names_literals)* Ok(Self { diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index bd717095..926e5b8c 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -88,29 +88,29 @@ impl Generator { quote! { impl #space_ident { - pub fn persist(&mut self) -> eyre::Result<()> { + pub async fn persist(&mut self) -> eyre::Result<()> { let prefix = &self.path; - std::fs::create_dir_all(prefix)?; + tokio::fs::create_dir_all(prefix).await?; { - let mut primary_index_file = std::fs::File::create(format!("{}/primary{}", &self.path, #index_extension))?; + let mut primary_index_file = tokio::fs::File::create(format!("{}/primary{}", &self.path, #index_extension)).await?; let mut info = self.get_primary_index_info()?; - persist_page(&mut info, &mut primary_index_file)?; + persist_page(&mut info, &mut primary_index_file).await?; for mut toc_page in &mut self.primary_index.0 { - persist_page(&mut toc_page, &mut primary_index_file)?; + persist_page(&mut toc_page, &mut primary_index_file).await?; } for mut primary_index_page in &mut self.primary_index.1 { - persist_page(&mut primary_index_page, &mut primary_index_file)?; + persist_page(&mut primary_index_page, &mut primary_index_file).await?; } } - self.indexes.persist(&prefix)?; + self.indexes.persist(&prefix).await?; { - let mut data_file = std::fs::File::create(format!("{}/{}", &self.path, #data_extension))?; - persist_page(&mut self.data_info, &mut data_file)?; + let mut data_file = tokio::fs::File::create(format!("{}/{}", &self.path, #data_extension)).await?; + persist_page(&mut self.data_info, &mut data_file).await?; for mut data_page in &mut self.data { - persist_page(&mut data_page, &mut data_file)?; + persist_page(&mut data_page, &mut data_file).await?; } } @@ -200,31 +200,31 @@ impl Generator { let data_extension = Literal::string(WT_DATA_EXTENSION); quote! { - pub fn parse_file(path: &str) -> eyre::Result { + pub async fn parse_file(path: &str) -> eyre::Result { let mut primary_index = { let mut primary_index = vec![]; - let mut primary_file = std::fs::File::open(format!("{}/primary{}", path, #index_extension))?; - let info = parse_page::, { #page_const_name as u32 }>(&mut primary_file, 0)?; - let file_length = primary_file.metadata()?.len(); + let mut primary_file = tokio::fs::File::open(format!("{}/primary{}", path, #index_extension)).await?; + let info = parse_page::, { #page_const_name as u32 }>(&mut primary_file, 0).await?; + let file_length = primary_file.metadata().await?.len(); let count = file_length / (#page_const_name as u64 + GENERAL_HEADER_SIZE as u64); let next_page_id = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(count as u32)); - let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut primary_file, 0.into(), next_page_id.clone())?; + let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut primary_file, 0.into(), next_page_id.clone()).await?; for page_id in toc.iter().map(|(_, page_id)| page_id) { - let index = parse_page::, { #page_const_name as u32 }>(&mut primary_file, (*page_id).into())?; + let index = parse_page::, { #page_const_name as u32 }>(&mut primary_file, (*page_id).into()).await?; primary_index.push(index); } (toc.pages, primary_index) }; - let indexes = #persisted_index_name::parse_from_file(path)?; + let indexes = #persisted_index_name::parse_from_file(path).await?; let (data, data_info) = { let mut data = vec![]; - let mut data_file = std::fs::File::open(format!("{}/{}", path, #data_extension))?; - let info = parse_page::::Generator as PrimaryKeyGeneratorState>::State>, { #page_const_name as u32 }>(&mut data_file, 0)?; - let file_length = data_file.metadata()?.len(); + let mut data_file = tokio::fs::File::open(format!("{}/{}", path, #data_extension)).await?; + let info = parse_page::::Generator as PrimaryKeyGeneratorState>::State>, { #page_const_name as u32 }>(&mut data_file, 0).await?; + let file_length = data_file.metadata().await?.len(); let count = file_length / (#inner_const_name as u64 + GENERAL_HEADER_SIZE as u64); for page_id in 1..=count { - let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32)?; + let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32).await?; data.push(index); } (data, info) diff --git a/codegen/src/persist_table/generator/space_file/worktable_impls.rs b/codegen/src/persist_table/generator/space_file/worktable_impls.rs index abdb7679..b9874ba6 100644 --- a/codegen/src/persist_table/generator/space_file/worktable_impls.rs +++ b/codegen/src/persist_table/generator/space_file/worktable_impls.rs @@ -36,9 +36,9 @@ impl Generator { fn gen_worktable_persist_fn(&self) -> TokenStream { quote! { - pub fn persist(&self) -> eyre::Result<()> { + pub async fn persist(&self) -> eyre::Result<()> { let mut space = self.into_space(); - space.persist()?; + space.persist().await?; Ok(()) } } @@ -51,13 +51,13 @@ impl Generator { let dir_name = name_generator.get_dir_name(); quote! { - pub fn load_from_file(config: PersistenceConfig) -> eyre::Result { + pub async fn load_from_file(config: PersistenceConfig) -> eyre::Result { let filename = format!("{}/{}", config.tables_path.as_str(), #dir_name); if !std::path::Path::new(filename.as_str()).exists() { - return #wt_ident::new(config); + return #wt_ident::new(config).await; }; - let space = #space_ident::parse_file(&filename)?; - let table = space.into_worktable(config); + let space = #space_ident::parse_file(&filename).await?; + let table = space.into_worktable(config).await; Ok(table) } } diff --git a/codegen/src/worktable/generator/table/impls.rs b/codegen/src/worktable/generator/table/impls.rs index 9649f5b7..33178593 100644 --- a/codegen/src/worktable/generator/table/impls.rs +++ b/codegen/src/worktable/generator/table/impls.rs @@ -44,11 +44,11 @@ impl Generator { if self.is_persist { quote! { - pub fn new(config: PersistenceConfig) -> eyre::Result { + pub async fn new(config: PersistenceConfig) -> eyre::Result { let mut inner = WorkTable::default(); inner.table_name = #table_name; let table_files_path = format!("{}/{}", config.tables_path, #dir_name); - let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path)?; + let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path).await?; core::result::Result::Ok(Self( inner, config, diff --git a/tests/mod.rs b/tests/mod.rs index 769d89a4..f305944f 100644 --- a/tests/mod.rs +++ b/tests/mod.rs @@ -1,14 +1,12 @@ -use std::io::Read; +use std::io::{BufReader, Read}; use std::path::Path; -use tokio::fs::File; -use tokio::io::BufReader; mod persistence; mod worktable; -pub async fn check_if_files_are_same(got: String, expected: String) -> bool { - let got = File::open(got).unwrap(); - let expected = File::open(expected).unwrap(); +pub fn check_if_files_are_same(got: String, expected: String) -> bool { + let got = std::fs::File::open(got).unwrap(); + let expected = std::fs::File::open(expected).unwrap(); // Check if file sizes are different if got.metadata().unwrap().len() != expected.metadata().unwrap().len() { @@ -30,7 +28,7 @@ pub async fn check_if_files_are_same(got: String, expected: String) -> bool { } pub fn check_if_dirs_are_same(got: String, expected: String) -> bool { - let paths = fs::read_dir(&expected).unwrap(); + let paths = std::fs::read_dir(&expected).unwrap(); for file in paths { let file_name = file.unwrap().file_name(); if !check_if_files_are_same( diff --git a/tests/persistence/mod.rs b/tests/persistence/mod.rs index 1c6400a2..0ac8270f 100644 --- a/tests/persistence/mod.rs +++ b/tests/persistence/mod.rs @@ -47,13 +47,13 @@ worktable!( pub const TEST_ROW_COUNT: usize = 100; -pub fn get_empty_test_wt() -> TestPersistWorkTable { +pub async fn get_empty_test_wt() -> TestPersistWorkTable { let config = PersistenceConfig::new("tests/data", "tests/data").unwrap(); - TestPersistWorkTable::new(config).unwrap() + TestPersistWorkTable::new(config).await.unwrap() } -pub fn get_test_wt() -> TestPersistWorkTable { - let table = get_empty_test_wt(); +pub async fn get_test_wt() -> TestPersistWorkTable { + let table = get_empty_test_wt().await; for i in 1..100 { let row = TestPersistRow { another: i, id: i }; @@ -63,9 +63,11 @@ pub fn get_test_wt() -> TestPersistWorkTable { table } -pub fn get_test_wt_without_secondary_indexes() -> TestWithoutSecondaryIndexesWorkTable { +pub async fn get_test_wt_without_secondary_indexes() -> TestWithoutSecondaryIndexesWorkTable { let config = PersistenceConfig::new("tests/data", "tests/data").unwrap(); - let table = TestWithoutSecondaryIndexesWorkTable::new(config).unwrap(); + let table = TestWithoutSecondaryIndexesWorkTable::new(config) + .await + .unwrap(); for i in 1..TEST_ROW_COUNT { let row = TestWithoutSecondaryIndexesRow { diff --git a/tests/persistence/read.rs b/tests/persistence/read.rs index b0c76feb..c8fcb4d6 100644 --- a/tests/persistence/read.rs +++ b/tests/persistence/read.rs @@ -3,8 +3,8 @@ use worktable::prelude::*; // TODO: Fix naming. use crate::persistence::{ - get_empty_test_wt, get_test_wt, TestPersistRow, TestPersistWorkTable, TEST_PERSIST_INNER_SIZE, - TEST_PERSIST_PAGE_SIZE, TEST_ROW_COUNT, + get_empty_test_wt, get_test_wt, TestPersistWorkTable, TEST_PERSIST_INNER_SIZE, + TEST_PERSIST_PAGE_SIZE, }; use crate::remove_dir_if_exists; @@ -126,8 +126,8 @@ async fn test_data_parse() { #[tokio::test] async fn test_space_parse() { let config = PersistenceConfig::new("tests/data/expected", "tests/data/expected").unwrap(); - let table = TestPersistWorkTable::load_from_file(config).unwrap(); - let expected = get_test_wt(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); + let expected = get_test_wt().await; assert_eq!( table.select_all().execute().unwrap(), @@ -137,51 +137,11 @@ async fn test_space_parse() { #[tokio::test] async fn test_space_parse_no_file() { - remove_dir_if_exists("tests/non-existent".to_string()); + remove_dir_if_exists("tests/non-existent".to_string()).await; let config = PersistenceConfig::new("tests/non-existent", "tests/non-existent").unwrap(); - let table = TestPersistWorkTable::load_from_file(config).unwrap(); - let expected = get_empty_test_wt(); - assert_eq!( - table.select_all().execute().unwrap(), - expected.select_all().execute().unwrap() - ); -} - -#[tokio::test] -async fn test_space_insert_after_read() { - let config = PersistenceConfig::new("tests/data/expected", "tests/data/expected").unwrap(); - let table = TestPersistWorkTable::load_from_file(config).unwrap(); - - let row = TestPersistRow { - another: TEST_ROW_COUNT as u64, - id: TEST_ROW_COUNT as u64, - }; - table.insert(row.clone()).unwrap(); - let expected = get_test_wt(); - expected.insert(row).unwrap(); - - assert_eq!( - table.select_all().execute().unwrap(), - expected.select_all().execute().unwrap() - ); -} - -#[tokio::test] -async fn test_space_delete_after_read() { - let config = PersistenceConfig::new("tests/data/expected", "tests/data/expected").unwrap(); - let table = TestPersistWorkTable::load_from_file(config).unwrap(); - - table - .delete((TEST_ROW_COUNT as u64 - 1).into()) - .await - .unwrap(); - let expected = get_test_wt(); - expected - .delete((TEST_ROW_COUNT as u64 - 1).into()) - .await - .unwrap(); - + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); + let expected = get_empty_test_wt().await; assert_eq!( table.select_all().execute().unwrap(), expected.select_all().execute().unwrap() diff --git a/tests/persistence/space_index/indexset_compatibility.rs b/tests/persistence/space_index/indexset_compatibility.rs index 2adfb280..f4bca9dc 100644 --- a/tests/persistence/space_index/indexset_compatibility.rs +++ b/tests/persistence/space_index/indexset_compatibility.rs @@ -8,7 +8,8 @@ use crate::{check_if_files_are_same, remove_file_if_exists}; #[tokio::test] async fn test_indexset_node_creation() { - remove_file_if_exists("tests/data/space_index/indexset/process_create_node.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/indexset/process_create_node.wt.idx".to_string()) + .await; let mut space_index = SpaceIndex::::new( "tests/data/space_index/indexset/process_create_node.wt.idx", @@ -37,7 +38,8 @@ async fn test_indexset_node_creation() { #[tokio::test] async fn test_space_index_process_insert_at() { - remove_file_if_exists("tests/data/space_index/indexset/process_insert_at.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/indexset/process_insert_at.wt.idx".to_string()) + .await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/indexset/process_insert_at.wt.idx", @@ -73,7 +75,8 @@ async fn test_space_index_process_insert_at() { async fn test_space_index_process_insert_at_big_amount() { remove_file_if_exists( "tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/indexset/process_insert_at_big_amount.wt.idx", diff --git a/tests/persistence/space_index/write.rs b/tests/persistence/space_index/write.rs index 5e1f04b6..b8f14287 100644 --- a/tests/persistence/space_index/write.rs +++ b/tests/persistence/space_index/write.rs @@ -12,12 +12,14 @@ mod run_first { #[tokio::test] async fn test_space_index_process_create_node() { - remove_file_if_exists("tests/data/space_index/process_create_node.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_create_node.wt.idx".to_string()) + .await; let mut space_index = SpaceIndex::::new( "tests/data/space_index/process_create_node.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -31,6 +33,7 @@ mod run_first { }, }, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -43,7 +46,8 @@ mod run_first { async fn test_space_index_process_create_second_node() { remove_file_if_exists( "tests/data/space_index/process_create_second_node.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/process_create_second_node.wt.idx", @@ -54,6 +58,7 @@ mod run_first { "tests/data/space_index/process_create_second_node.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -67,6 +72,7 @@ mod run_first { }, }, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -77,7 +83,7 @@ mod run_first { #[tokio::test] async fn test_space_index_process_insert_at() { - remove_file_if_exists("tests/data/space_index/process_insert_at.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_insert_at.wt.idx".to_string()).await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/process_insert_at.wt.idx", @@ -88,6 +94,7 @@ mod run_first { "tests/data/space_index/process_insert_at.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -110,6 +117,7 @@ mod run_first { }, index: 0, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -122,7 +130,8 @@ mod run_first { async fn test_space_index_process_insert_at_big_amount() { remove_file_if_exists( "tests/data/space_index/process_insert_at_big_amount.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/process_insert_at_big_amount.wt.idx", @@ -133,6 +142,7 @@ mod run_first { "tests/data/space_index/process_insert_at_big_amount.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -155,6 +165,7 @@ mod run_first { }, index: 1, }) + .await .unwrap(); for i in (6..911).rev() { @@ -178,6 +189,7 @@ mod run_first { }, index: 1, }) + .await .unwrap(); } @@ -189,7 +201,8 @@ mod run_first { #[tokio::test] async fn test_space_index_process_remove_node() { - remove_file_if_exists("tests/data/space_index/process_remove_node.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_remove_node.wt.idx".to_string()) + .await; copy( "tests/data/expected/space_index/process_create_second_node.wt.idx", "tests/data/space_index/process_remove_node.wt.idx", @@ -200,6 +213,7 @@ mod run_first { "tests/data/space_index/process_remove_node.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -213,6 +227,7 @@ mod run_first { }, }, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -226,7 +241,8 @@ mod run_first { async fn test_space_index_process_insert_at_with_node_id_update() { remove_file_if_exists( "tests/data/space_index/process_insert_at_with_node_id_update.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_create_node.wt.idx", "tests/data/space_index/process_insert_at_with_node_id_update.wt.idx", @@ -237,6 +253,7 @@ async fn test_space_index_process_insert_at_with_node_id_update() { "tests/data/space_index/process_insert_at_with_node_id_update.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -259,6 +276,7 @@ async fn test_space_index_process_insert_at_with_node_id_update() { }, index: 1, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -269,7 +287,7 @@ async fn test_space_index_process_insert_at_with_node_id_update() { #[tokio::test] async fn test_space_index_process_remove_at() { - remove_file_if_exists("tests/data/space_index/process_remove_at.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_remove_at.wt.idx".to_string()).await; copy( "tests/data/expected/space_index/process_insert_at.wt.idx", "tests/data/space_index/process_remove_at.wt.idx", @@ -280,6 +298,7 @@ async fn test_space_index_process_remove_at() { "tests/data/space_index/process_remove_at.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -302,6 +321,7 @@ async fn test_space_index_process_remove_at() { }, index: 0, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -312,7 +332,8 @@ async fn test_space_index_process_remove_at() { #[tokio::test] async fn test_space_index_process_remove_at_node_id() { - remove_file_if_exists("tests/data/space_index/process_remove_at_node_id.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_remove_at_node_id.wt.idx".to_string()) + .await; copy( "tests/data/expected/space_index/process_insert_at.wt.idx", "tests/data/space_index/process_remove_at_node_id.wt.idx", @@ -323,6 +344,7 @@ async fn test_space_index_process_remove_at_node_id() { "tests/data/space_index/process_remove_at_node_id.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -345,6 +367,7 @@ async fn test_space_index_process_remove_at_node_id() { }, index: 1, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -357,7 +380,8 @@ async fn test_space_index_process_remove_at_node_id() { async fn test_space_index_process_insert_at_removed_place() { remove_file_if_exists( "tests/data/space_index/process_insert_at_removed_place.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_insert_at.wt.idx", "tests/data/space_index/process_insert_at_removed_place.wt.idx", @@ -368,6 +392,7 @@ async fn test_space_index_process_insert_at_removed_place() { "tests/data/space_index/process_insert_at_removed_place.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -390,6 +415,7 @@ async fn test_space_index_process_insert_at_removed_place() { }, index: 2, }) + .await .unwrap(); space_index .process_change_event(ChangeEvent::RemoveAt { @@ -411,6 +437,7 @@ async fn test_space_index_process_insert_at_removed_place() { }, index: 1, }) + .await .unwrap(); space_index .process_change_event(ChangeEvent::InsertAt { @@ -432,6 +459,7 @@ async fn test_space_index_process_insert_at_removed_place() { }, index: 1, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -444,7 +472,8 @@ async fn test_space_index_process_insert_at_removed_place() { async fn test_space_index_process_create_node_after_remove() { remove_file_if_exists( "tests/data/space_index/process_create_node_after_remove.wt.idx".to_string(), - ); + ) + .await; copy( "tests/data/expected/space_index/process_remove_node.wt.idx", "tests/data/space_index/process_create_node_after_remove.wt.idx", @@ -455,6 +484,7 @@ async fn test_space_index_process_create_node_after_remove() { "tests/data/space_index/process_create_node_after_remove.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -468,6 +498,7 @@ async fn test_space_index_process_create_node_after_remove() { }, }, }) + .await .unwrap(); assert!(check_if_files_are_same( @@ -478,7 +509,7 @@ async fn test_space_index_process_create_node_after_remove() { #[tokio::test] async fn test_space_index_process_split_node() { - remove_file_if_exists("tests/data/space_index/process_split_node.wt.idx".to_string()); + remove_file_if_exists("tests/data/space_index/process_split_node.wt.idx".to_string()).await; copy( "tests/data/expected/space_index/process_insert_at_big_amount.wt.idx", "tests/data/space_index/process_split_node.wt.idx", @@ -489,6 +520,7 @@ async fn test_space_index_process_split_node() { "tests/data/space_index/process_split_node.wt.idx", 0.into(), ) + .await .unwrap(); space_index @@ -503,6 +535,7 @@ async fn test_space_index_process_split_node() { }, split_index: 453, }) + .await .unwrap(); assert!(check_if_files_are_same( diff --git a/tests/persistence/sync/mod.rs b/tests/persistence/sync/mod.rs index 7cef2bb1..016f548a 100644 --- a/tests/persistence/sync/mod.rs +++ b/tests/persistence/sync/mod.rs @@ -4,8 +4,6 @@ use worktable::prelude::{PersistenceConfig, PrimaryKeyGeneratorState}; #[test] fn test_space_insert_sync() { - remove_dir_if_exists("tests/data/sync/insert".to_string()); - let config = PersistenceConfig::new("tests/data/sync/insert", "tests/data/sync/insert").unwrap(); @@ -17,8 +15,12 @@ fn test_space_insert_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/insert".to_string()).await; + let pk = { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); let row = TestPersistRow { another: 42, id: table.get_next_pk().0, @@ -28,7 +30,7 @@ fn test_space_insert_sync() { row.id }; { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); assert!(table.select(pk.into()).is_some()); assert_eq!(table.0.pk_gen.get_state(), pk + 1) } @@ -37,8 +39,6 @@ fn test_space_insert_sync() { #[test] fn test_space_insert_many_sync() { - remove_dir_if_exists("tests/data/sync/insert_many".to_string()); - let config = PersistenceConfig::new("tests/data/sync/insert_many", "tests/data/sync/insert_many") .unwrap(); @@ -51,9 +51,13 @@ fn test_space_insert_many_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/insert_many".to_string()).await; + let mut pks = vec![]; { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); for i in 0..20 { let pk = { let row = TestPersistRow { @@ -69,7 +73,7 @@ fn test_space_insert_many_sync() { } { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); let last = *pks.last().unwrap(); for pk in pks { assert!(table.select(pk.into()).is_some()); @@ -81,8 +85,6 @@ fn test_space_insert_many_sync() { #[test] fn test_space_update_full_sync() { - remove_dir_if_exists("tests/data/sync/update_full".to_string()); - let config = PersistenceConfig::new("tests/data/sync/update_full", "tests/data/sync/update_full") .unwrap(); @@ -95,8 +97,12 @@ fn test_space_update_full_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/update_full".to_string()).await; + let pk = { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); let row = TestPersistRow { another: 42, id: table.get_next_pk().0, @@ -113,7 +119,7 @@ fn test_space_update_full_sync() { row.id }; { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); assert!(table.select(pk.into()).is_some()); assert_eq!(table.select(pk.into()).unwrap().another, 13); assert_eq!(table.0.pk_gen.get_state(), pk + 1) @@ -123,8 +129,6 @@ fn test_space_update_full_sync() { #[test] fn test_space_update_query_sync() { - remove_dir_if_exists("tests/data/sync/update_query".to_string()); - let config = PersistenceConfig::new( "tests/data/sync/update_query", "tests/data/sync/update_query", @@ -139,8 +143,12 @@ fn test_space_update_query_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/update_query".to_string()).await; + let pk = { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); let row = TestPersistRow { another: 42, id: table.get_next_pk().0, @@ -154,7 +162,7 @@ fn test_space_update_query_sync() { row.id }; { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); assert!(table.select(pk.into()).is_some()); assert_eq!(table.select(pk.into()).unwrap().another, 13); assert_eq!(table.0.pk_gen.get_state(), pk + 1) @@ -164,8 +172,6 @@ fn test_space_update_query_sync() { #[test] fn test_space_delete_sync() { - remove_dir_if_exists("tests/data/sync/delete".to_string()); - let config = PersistenceConfig::new("tests/data/sync/delete", "tests/data/sync/delete").unwrap(); @@ -177,8 +183,12 @@ fn test_space_delete_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/delete".to_string()).await; + let pk = { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); let row = TestPersistRow { another: 42, id: table.get_next_pk().0, @@ -189,7 +199,7 @@ fn test_space_delete_sync() { row.id }; { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); assert!(table.select(pk.into()).is_none()); assert_eq!(table.0.pk_gen.get_state(), pk + 1) } @@ -198,8 +208,6 @@ fn test_space_delete_sync() { #[test] fn test_space_delete_query_sync() { - remove_dir_if_exists("tests/data/sync/delete_query".to_string()); - let config = PersistenceConfig::new( "tests/data/sync/delete_query", "tests/data/sync/delete_query", @@ -214,8 +222,12 @@ fn test_space_delete_query_sync() { .unwrap(); runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/delete_query".to_string()).await; + let pk = { - let table = TestPersistWorkTable::load_from_file(config.clone()).unwrap(); + let table = TestPersistWorkTable::load_from_file(config.clone()) + .await + .unwrap(); let row = TestPersistRow { another: 42, id: table.get_next_pk().0, @@ -226,7 +238,7 @@ fn test_space_delete_query_sync() { row.id }; { - let table = TestPersistWorkTable::load_from_file(config).unwrap(); + let table = TestPersistWorkTable::load_from_file(config).await.unwrap(); assert!(table.select(pk.into()).is_none()); assert_eq!(table.0.pk_gen.get_state(), pk + 1) } diff --git a/tests/persistence/toc/read.rs b/tests/persistence/toc/read.rs index c37f9a26..c95c321d 100644 --- a/tests/persistence/toc/read.rs +++ b/tests/persistence/toc/read.rs @@ -1,15 +1,16 @@ use data_bucket::INNER_PAGE_SIZE; -use std::fs::OpenOptions; use std::sync::atomic::AtomicU32; use std::sync::Arc; +use tokio::fs::OpenOptions; use worktable::prelude::IndexTableOfContents; -#[test] -fn test_index_table_of_contents_read() { +#[tokio::test] +async fn test_index_table_of_contents_read() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/persist_index_table_of_contents.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(1)); let toc = IndexTableOfContents::::parse_from_file( @@ -17,17 +18,19 @@ fn test_index_table_of_contents_read() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&13), Some(1.into())) } -#[test] -fn test_index_table_of_contents_read_from_space() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/test_persist/primary.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(1)); let toc = IndexTableOfContents::::parse_from_file( @@ -35,17 +38,19 @@ fn test_index_table_of_contents_read_from_space() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&99), Some(2.into())) } -#[test] -fn test_index_table_of_contents_read_from_space_index() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_create_node.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -53,17 +58,19 @@ fn test_index_table_of_contents_read_from_space_index() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&5), Some(2.into())) } -#[test] -fn test_index_table_of_contents_read_from_space_index_after_insert() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_after_insert() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -71,17 +78,19 @@ fn test_index_table_of_contents_read_from_space_index_after_insert() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&5), Some(2.into())) } -#[test] -fn test_index_table_of_contents_read_from_space_index_with_updated_node_id() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_with_updated_node_id() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_insert_at_with_node_id_update.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -89,17 +98,19 @@ fn test_index_table_of_contents_read_from_space_index_with_updated_node_id() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&7), Some(2.into())) } -#[test] -fn test_index_table_of_contents_read_from_space_index_with_remove_at_node_id() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_with_remove_at_node_id() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_remove_at_node_id.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -107,17 +118,19 @@ fn test_index_table_of_contents_read_from_space_index_with_remove_at_node_id() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&3), Some(2.into())); } -#[test] -fn test_index_table_of_contents_read_from_space_index_with_remove_node() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_with_remove_node() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_remove_node.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -125,18 +138,20 @@ fn test_index_table_of_contents_read_from_space_index_with_remove_node() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&5), None); assert_eq!(toc.get(&15), Some(3.into())); } -#[test] -fn test_index_table_of_contents_read_from_space_index_with_create_node_after_remove_node() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_with_create_node_after_remove_node() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_create_node_after_remove.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -144,18 +159,20 @@ fn test_index_table_of_contents_read_from_space_index_with_create_node_after_rem 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&10), Some(2.into())); assert_eq!(toc.get(&15), Some(3.into())); } -#[test] -fn test_index_table_of_contents_read_from_space_index_after_split_node() { +#[tokio::test] +async fn test_index_table_of_contents_read_from_space_index_after_split_node() { let mut file = OpenOptions::new() .write(true) .read(true) .open("tests/data/expected/space_index/process_split_node.wt.idx") + .await .unwrap(); let next_id_gen = Arc::new(AtomicU32::new(2)); let toc = IndexTableOfContents::::parse_from_file( @@ -163,6 +180,7 @@ fn test_index_table_of_contents_read_from_space_index_after_split_node() { 0.into(), next_id_gen, ) + .await .unwrap(); assert_eq!(toc.get(&1000), Some(3.into())); diff --git a/tests/persistence/toc/write.rs b/tests/persistence/toc/write.rs index 0816324e..476bd96b 100644 --- a/tests/persistence/toc/write.rs +++ b/tests/persistence/toc/write.rs @@ -1,22 +1,24 @@ use data_bucket::INNER_PAGE_SIZE; -use std::fs::File; use std::sync::atomic::AtomicU32; use std::sync::Arc; +use tokio::fs::File; use worktable::prelude::IndexTableOfContents; use crate::{check_if_files_are_same, remove_file_if_exists}; -#[test] -fn test_persist_index_table_of_contents() { - remove_file_if_exists("tests/data/persist_index_table_of_contents.wt.idx".to_string()); +#[tokio::test] +async fn test_persist_index_table_of_contents() { + remove_file_if_exists("tests/data/persist_index_table_of_contents.wt.idx".to_string()).await; let mut toc = IndexTableOfContents::::new( 0.into(), Arc::new(AtomicU32::new(1)), ); toc.insert(13, 1.into()); - let mut file = File::create("tests/data/persist_index_table_of_contents.wt.idx").unwrap(); - toc.persist(&mut file).unwrap(); + let mut file = File::create("tests/data/persist_index_table_of_contents.wt.idx") + .await + .unwrap(); + toc.persist(&mut file).await.unwrap(); assert!(check_if_files_are_same( "tests/data/persist_index_table_of_contents.wt.idx".to_string(), diff --git a/tests/persistence/write.rs b/tests/persistence/write.rs index 6d2d027e..0d91271e 100644 --- a/tests/persistence/write.rs +++ b/tests/persistence/write.rs @@ -4,8 +4,6 @@ use crate::persistence::{get_test_wt, get_test_wt_without_secondary_indexes}; #[test] fn test_persist() { - remove_dir_if_exists("tests/data/test_persist".to_string()); - let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_io() @@ -14,9 +12,11 @@ fn test_persist() { .unwrap(); runtime.block_on(async { - let table = get_test_wt(); + remove_dir_if_exists("tests/data/test_persist".to_string()).await; + + let table = get_test_wt().await; table.wait_for_ops().await; - table.persist().unwrap(); + table.persist().await.unwrap(); assert!(check_if_dirs_are_same( "tests/data/test_persist".to_string(), @@ -27,8 +27,6 @@ fn test_persist() { #[test] fn test_persist_without_secondary_indexes() { - remove_dir_if_exists("tests/data/test_without_secondary_indexes".to_string()); - let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_io() @@ -37,9 +35,11 @@ fn test_persist_without_secondary_indexes() { .unwrap(); runtime.block_on(async { - let table = get_test_wt_without_secondary_indexes(); + remove_dir_if_exists("tests/data/test_without_secondary_indexes".to_string()).await; + + let table = get_test_wt_without_secondary_indexes().await; table.wait_for_ops().await; - table.persist().unwrap(); + table.persist().await.unwrap(); assert!(check_if_dirs_are_same( "tests/data/test_without_secondary_indexes".to_string(), From 6ea53cbee6cee52c7ae4132ecf057ff42bb4c6c1 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:34:16 +0300 Subject: [PATCH 4/5] clippy --- src/in_memory/data.rs | 1 + src/in_memory/pages.rs | 1 + src/persistence/engine.rs | 63 ++++++++++++++---------------- src/persistence/space/data.rs | 36 +++++++---------- src/persistence/space/index/mod.rs | 47 +++++++++++----------- 5 files changed, 69 insertions(+), 79 deletions(-) diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index 4f325a00..d751375c 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -126,6 +126,7 @@ impl Data { Ok(link) } + #[allow(clippy::missing_safety_doc)] #[cfg_attr( feature = "perf_measurements", performance_measurement(prefix_name = "DataRow") diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 7e08e63b..70949bd0 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -260,6 +260,7 @@ where Ok(res) } + #[allow(clippy::missing_safety_doc)] #[cfg_attr( feature = "perf_measurements", performance_measurement(prefix_name = "DataPages") diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index 60f73550..780125ba 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -1,5 +1,4 @@ use std::fs; -use std::future::Future; use std::marker::PhantomData; use std::path::Path; @@ -93,42 +92,40 @@ where SecondaryIndexEvents: Send, PrimaryKeyGenState: Send, { - fn apply_operation( + async fn apply_operation( &mut self, op: Operation, - ) -> impl Future> + Send { - async { - match op { - Operation::Insert(insert) => { - self.data - .save_data(insert.link, insert.bytes.as_ref()) - .await?; - for event in insert.primary_key_events { - self.primary_index.process_change_event(event).await?; - } - let info = self.data.get_mut_info(); - info.inner.pk_gen_state = insert.pk_gen_state; - self.data.save_info().await?; - self.secondary_indexes - .process_change_events(insert.secondary_keys_events) - .await + ) -> eyre::Result<()> { + match op { + Operation::Insert(insert) => { + self.data + .save_data(insert.link, insert.bytes.as_ref()) + .await?; + for event in insert.primary_key_events { + self.primary_index.process_change_event(event).await?; } - Operation::Update(update) => { - self.data - .save_data(update.link, update.bytes.as_ref()) - .await?; - self.secondary_indexes - .process_change_events(update.secondary_keys_events) - .await - } - Operation::Delete(delete) => { - for event in delete.primary_key_events { - self.primary_index.process_change_event(event).await?; - } - self.secondary_indexes - .process_change_events(delete.secondary_keys_events) - .await + let info = self.data.get_mut_info(); + info.inner.pk_gen_state = insert.pk_gen_state; + self.data.save_info().await?; + self.secondary_indexes + .process_change_events(insert.secondary_keys_events) + .await + } + Operation::Update(update) => { + self.data + .save_data(update.link, update.bytes.as_ref()) + .await?; + self.secondary_indexes + .process_change_events(update.secondary_keys_events) + .await + } + Operation::Delete(delete) => { + for event in delete.primary_key_events { + self.primary_index.process_change_event(event).await?; } + self.secondary_indexes + .process_change_events(delete.secondary_keys_events) + .await } } } diff --git a/src/persistence/space/data.rs b/src/persistence/space/data.rs index 041fc8b1..9f4a3a90 100644 --- a/src/persistence/space/data.rs +++ b/src/persistence/space/data.rs @@ -102,28 +102,22 @@ where persist_page(&mut page, file).await } - fn save_data( - &mut self, - link: Link, - bytes: &[u8], - ) -> impl Future> + Send { - async move { - if link.page_id > self.last_page_id.into() { - let mut page = GeneralPage { - header: GeneralHeader::new(link.page_id, PageType::SpaceInfo, 0.into()), - inner: DataPage { - length: 0, - data: [0; 1], - }, - }; - persist_page(&mut page, &mut self.data_file).await?; - self.current_data_length = 0; - self.last_page_id += 1; - } - self.current_data_length += link.length; - self.update_data_length().await?; - update_at::<{ DATA_LENGTH }>(&mut self.data_file, link, bytes).await + async fn save_data(&mut self, link: Link, bytes: &[u8]) -> eyre::Result<()> { + if link.page_id > self.last_page_id.into() { + let mut page = GeneralPage { + header: GeneralHeader::new(link.page_id, PageType::SpaceInfo, 0.into()), + inner: DataPage { + length: 0, + data: [0; 1], + }, + }; + persist_page(&mut page, &mut self.data_file).await?; + self.current_data_length = 0; + self.last_page_id += 1; } + self.current_data_length += link.length; + self.update_data_length().await?; + update_at::<{ DATA_LENGTH }>(&mut self.data_file, link, bytes).await } fn get_mut_info(&mut self) -> &mut GeneralPage> { diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index 4e788006..e0b74c34 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -2,7 +2,6 @@ mod table_of_contents; mod util; use std::fmt::Debug; -use std::future::Future; use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -466,33 +465,31 @@ where persist_page(&mut page, file).await } - fn process_change_event( + async fn process_change_event( &mut self, event: ChangeEvent>, - ) -> impl Future> + Send { - async { - match event { - ChangeEvent::InsertAt { - max_value: node_id, - value, - index, - } => self.process_insert_at(node_id.key, value, index).await, - ChangeEvent::RemoveAt { - max_value: node_id, - value, - index, - } => self.process_remove_at(node_id.key, value, index).await, - ChangeEvent::CreateNode { max_value: node_id } => { - self.process_create_node(node_id).await - } - ChangeEvent::RemoveNode { max_value: node_id } => { - self.process_remove_node(node_id.key).await - } - ChangeEvent::SplitNode { - max_value: node_id, - split_index, - } => self.process_split_node(node_id.key, split_index).await, + ) -> eyre::Result<()> { + match event { + ChangeEvent::InsertAt { + max_value: node_id, + value, + index, + } => self.process_insert_at(node_id.key, value, index).await, + ChangeEvent::RemoveAt { + max_value: node_id, + value, + index, + } => self.process_remove_at(node_id.key, value, index).await, + ChangeEvent::CreateNode { max_value: node_id } => { + self.process_create_node(node_id).await } + ChangeEvent::RemoveNode { max_value: node_id } => { + self.process_remove_node(node_id.key).await + } + ChangeEvent::SplitNode { + max_value: node_id, + split_index, + } => self.process_split_node(node_id.key, split_index).await, } } } From 92331b6611683c03e3693a7d654262ac47691c16 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:42:18 +0300 Subject: [PATCH 5/5] data bucket ver --- Cargo.toml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 51c6c1b8..2a39db5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,12 +26,9 @@ worktable_codegen = { path = "codegen", version = "0.5.0" } futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4"] } # data_bucket = "0.2.0" -# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } -data_bucket = { path = "../DataBucket", version = "0.2.0" } +data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } +# data_bucket = { path = "../DataBucket", version = "0.2.0" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" - -[dev-dependencies] -async-std = "1.13.0"