Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ tracing = "0.1.40"
rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.5.0" }
scc = "2.1.16"
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4"] }
data_bucket = "0.2.0"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
# data_bucket = { path = "../DataBucket", version = "0.1.0" }
# data_bucket = "0.2.0"
data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
# data_bucket = { path = "../DataBucket", version = "0.2.0" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] }
convert_case = "0.6.0"

[dev-dependencies]
async-std = "1.13.0"
24 changes: 12 additions & 12 deletions codegen/src/persist_index/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,23 @@ impl Generator {
let index_name_literal = Literal::string(i.to_string().as_str());
quote! {
{
let mut file = std::fs::File::create(format!("{}/{}{}", path, #index_name_literal, #index_extension))?;
let mut file = tokio::fs::File::create(format!("{}/{}{}", path, #index_name_literal, #index_extension)).await?;
let mut info = #ident::space_info_default();
info.inner.page_count = self.#i.1.len() as u32 + self.#i.0.len() as u32;;
persist_page(&mut info, &mut file)?;
info.inner.page_count = self.#i.1.len() as u32 + self.#i.0.len() as u32;
persist_page(&mut info, &mut file).await?;
for mut page in &mut self.#i.0 {
persist_page(&mut page, &mut file)?;
persist_page(&mut page, &mut file).await?;
}
for mut page in &mut self.#i.1 {
persist_page(&mut page, &mut file)?;
persist_page(&mut page, &mut file).await?;
}
}
}
})
.collect::<Vec<_>>();

quote! {
pub fn persist(&mut self, path: &str) -> eyre::Result<()>
pub async fn persist(&mut self, path: &str) -> eyre::Result<()>
{
#(#persist_logic)*
Ok(())
Expand Down Expand Up @@ -193,14 +193,14 @@ impl Generator {
.map(|(l, i)| quote! {
let #i = {
let mut #i = vec![];
let mut file = std::fs::File::open(format!("{}/{}{}", path, #l, #index_extension))?;
let info = parse_page::<SpaceInfoPage<()>, { #page_const_name as u32 }>(&mut file, 0)?;
let file_length = file.metadata()?.len();
let mut file = tokio::fs::File::open(format!("{}/{}{}", path, #l, #index_extension)).await?;
let info = parse_page::<SpaceInfoPage<()>, { #page_const_name as u32 }>(&mut file, 0).await?;
let file_length = file.metadata().await?.len();
let page_id = file_length / (#page_const_name as u64 + GENERAL_HEADER_SIZE as u64) + 1;
let next_page_id = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(page_id as u32));
let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut file, 0.into(), next_page_id.clone())?;
let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut file, 0.into(), next_page_id.clone()).await?;
for page_id in toc.iter().map(|(_, page_id)| page_id) {
let index = parse_page::<IndexPage<_>, { #page_const_name as u32 }>(&mut file, (*page_id).into())?;
let index = parse_page::<IndexPage<_>, { #page_const_name as u32 }>(&mut file, (*page_id).into()).await?;
#i.push(index);
}
(toc.pages, #i)
Expand All @@ -220,7 +220,7 @@ impl Generator {
.collect::<Vec<_>>();

quote! {
pub fn parse_from_file(path: &str) -> eyre::Result<Self> {
pub async fn parse_from_file(path: &str) -> eyre::Result<Self> {
#(#field_names_literals)*

Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions codegen/src/persist_index/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ impl Generator {
.map(|i| {
let literal_name = Literal::string(i.to_string().as_str());
quote! {
#i: SpaceIndex::secondary_from_table_files_path(path, #literal_name)?,
#i: SpaceIndex::secondary_from_table_files_path(path, #literal_name).await?,
}
})
.collect();

quote! {
fn from_table_files_path<S: AsRef<str>>(path: S) -> eyre::Result<Self> {
async fn from_table_files_path<S: AsRef<str>>(path: S) -> eyre::Result<Self> {
let path = path.as_ref();
Ok(Self {
#(#fields)*
Expand All @@ -113,14 +113,14 @@ impl Generator {
.map(|i| {
quote! {
for event in events.#i {
self.#i.process_change_event(event)?;
self.#i.process_change_event(event).await?;
}
}
})
.collect();

quote! {
fn process_change_events(&mut self, events: #events_ident) -> eyre::Result<()> {
async fn process_change_events(&mut self, events: #events_ident) -> eyre::Result<()> {
#(#process)*
core::result::Result::Ok(())
}
Expand Down
45 changes: 23 additions & 22 deletions codegen/src/persist_table/generator/space_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,29 @@ impl Generator {

quote! {
impl #space_ident {
pub fn persist(&mut self) -> eyre::Result<()> {
pub async fn persist(&mut self) -> eyre::Result<()> {
let prefix = &self.path;
std::fs::create_dir_all(prefix)?;
tokio::fs::create_dir_all(prefix).await?;

{
let mut primary_index_file = std::fs::File::create(format!("{}/primary{}", &self.path, #index_extension))?;
let mut primary_index_file = tokio::fs::File::create(format!("{}/primary{}", &self.path, #index_extension)).await?;
let mut info = self.get_primary_index_info()?;
persist_page(&mut info, &mut primary_index_file)?;
persist_page(&mut info, &mut primary_index_file).await?;
for mut toc_page in &mut self.primary_index.0 {
persist_page(&mut toc_page, &mut primary_index_file)?;
persist_page(&mut toc_page, &mut primary_index_file).await?;
}
for mut primary_index_page in &mut self.primary_index.1 {
persist_page(&mut primary_index_page, &mut primary_index_file)?;
persist_page(&mut primary_index_page, &mut primary_index_file).await?;
}
}

self.indexes.persist(&prefix)?;
self.indexes.persist(&prefix).await?;

{
let mut data_file = std::fs::File::create(format!("{}/{}", &self.path, #data_extension))?;
persist_page(&mut self.data_info, &mut data_file)?;
let mut data_file = tokio::fs::File::create(format!("{}/{}", &self.path, #data_extension)).await?;
persist_page(&mut self.data_info, &mut data_file).await?;
for mut data_page in &mut self.data {
persist_page(&mut data_page, &mut data_file)?;
persist_page(&mut data_page, &mut data_file).await?;
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl Generator {
let dir_name = name_generator.get_dir_name();

quote! {
pub fn into_worktable(self, config: PersistenceConfig) -> #wt_ident {
pub async fn into_worktable(self, config: PersistenceConfig) -> #wt_ident {
let mut page_id = 1;
let data = self.data.into_iter().map(|p| {
let mut data = Data::from_data_page(p);
Expand Down Expand Up @@ -179,6 +179,7 @@ impl Generator {

let path = format!("{}/{}", config.tables_path.as_str(), #dir_name);
let engine: #engine_ident = PersistenceEngine::from_table_files_path(path)
.await
.expect("should not panic as SpaceFile is ok");
#wt_ident(
table,
Expand All @@ -199,31 +200,31 @@ impl Generator {
let data_extension = Literal::string(WT_DATA_EXTENSION);

quote! {
pub fn parse_file(path: &str) -> eyre::Result<Self> {
pub async fn parse_file(path: &str) -> eyre::Result<Self> {
let mut primary_index = {
let mut primary_index = vec![];
let mut primary_file = std::fs::File::open(format!("{}/primary{}", path, #index_extension))?;
let info = parse_page::<SpaceInfoPage<()>, { #page_const_name as u32 }>(&mut primary_file, 0)?;
let file_length = primary_file.metadata()?.len();
let mut primary_file = tokio::fs::File::open(format!("{}/primary{}", path, #index_extension)).await?;
let info = parse_page::<SpaceInfoPage<()>, { #page_const_name as u32 }>(&mut primary_file, 0).await?;
let file_length = primary_file.metadata().await?.len();
let count = file_length / (#page_const_name as u64 + GENERAL_HEADER_SIZE as u64);
let next_page_id = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(count as u32));
let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut primary_file, 0.into(), next_page_id.clone())?;
let toc = IndexTableOfContents::<_, { #page_const_name as u32 }>::parse_from_file(&mut primary_file, 0.into(), next_page_id.clone()).await?;
for page_id in toc.iter().map(|(_, page_id)| page_id) {
let index = parse_page::<IndexPage<#pk_type>, { #page_const_name as u32 }>(&mut primary_file, (*page_id).into())?;
let index = parse_page::<IndexPage<#pk_type>, { #page_const_name as u32 }>(&mut primary_file, (*page_id).into()).await?;
primary_index.push(index);
}
(toc.pages, primary_index)
};

let indexes = #persisted_index_name::parse_from_file(path)?;
let indexes = #persisted_index_name::parse_from_file(path).await?;
let (data, data_info) = {
let mut data = vec![];
let mut data_file = std::fs::File::open(format!("{}/{}", path, #data_extension))?;
let info = parse_page::<SpaceInfoPage<<<#pk_type as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State>, { #page_const_name as u32 }>(&mut data_file, 0)?;
let file_length = data_file.metadata()?.len();
let mut data_file = tokio::fs::File::open(format!("{}/{}", path, #data_extension)).await?;
let info = parse_page::<SpaceInfoPage<<<#pk_type as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State>, { #page_const_name as u32 }>(&mut data_file, 0).await?;
let file_length = data_file.metadata().await?.len();
let count = file_length / (#inner_const_name as u64 + GENERAL_HEADER_SIZE as u64);
for page_id in 1..=count {
let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32)?;
let index = parse_data_page::<{ #page_const_name }, { #inner_const_name }>(&mut data_file, page_id as u32).await?;
data.push(index);
}
(data, info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ impl Generator {

fn gen_worktable_persist_fn(&self) -> TokenStream {
quote! {
pub fn persist(&self) -> eyre::Result<()> {
pub async fn persist(&self) -> eyre::Result<()> {
let mut space = self.into_space();
space.persist()?;
space.persist().await?;
Ok(())
}
}
Expand All @@ -51,13 +51,13 @@ impl Generator {
let dir_name = name_generator.get_dir_name();

quote! {
pub fn load_from_file(config: PersistenceConfig) -> eyre::Result<Self> {
pub async fn load_from_file(config: PersistenceConfig) -> eyre::Result<Self> {
let filename = format!("{}/{}", config.tables_path.as_str(), #dir_name);
if !std::path::Path::new(filename.as_str()).exists() {
return #wt_ident::new(config);
return #wt_ident::new(config).await;
};
let space = #space_ident::parse_file(&filename)?;
let table = space.into_worktable(config);
let space = #space_ident::parse_file(&filename).await?;
let table = space.into_worktable(config).await;
Ok(table)
}
}
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/table/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl Generator {

if self.is_persist {
quote! {
pub fn new(config: PersistenceConfig) -> eyre::Result<Self> {
pub async fn new(config: PersistenceConfig) -> eyre::Result<Self> {
let mut inner = WorkTable::default();
inner.table_name = #table_name;
let table_files_path = format!("{}/{}", config.tables_path, #dir_name);
let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path)?;
let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path).await?;
core::result::Result::Ok(Self(
inner,
config,
Expand Down
1 change: 1 addition & 0 deletions src/in_memory/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl<Row, const DATA_LENGTH: usize> Data<Row, DATA_LENGTH> {
Ok(link)
}

#[allow(clippy::missing_safety_doc)]
#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataRow")
Expand Down
1 change: 1 addition & 0 deletions src/in_memory/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ where
Ok(res)
}

#[allow(clippy::missing_safety_doc)]
#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataPages")
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub mod prelude {

pub use derive_more::{From, Into};
pub use lockfree::set::Set as LockFreeSet;
pub use scc::{ebr::Guard, tree_index::TreeIndex};
pub use worktable_codegen::{PersistIndex, PersistTable};

pub const WT_INDEX_EXTENSION: &str = ".wt.idx";
Expand Down
39 changes: 25 additions & 14 deletions src/persistence/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@ where
SpacePrimaryIndex: SpaceIndexOps<PrimaryKey>,
SpaceSecondaryIndexes: SpaceSecondaryIndexOps<SecondaryIndexEvents>,
{
pub fn from_table_files_path<S: AsRef<str> + Clone>(path: S) -> eyre::Result<Self> {
pub async fn from_table_files_path<S: AsRef<str> + Clone + Send>(
path: S,
) -> eyre::Result<Self> {
let table_path = Path::new(path.as_ref());
if !table_path.exists() {
fs::create_dir_all(table_path)?;
}

Ok(Self {
data: SpaceData::from_table_files_path(path.clone())?,
primary_index: SpacePrimaryIndex::primary_from_table_files_path(path.clone())?,
secondary_indexes: SpaceSecondaryIndexes::from_table_files_path(path)?,
data: SpaceData::from_table_files_path(path.clone()).await?,
primary_index: SpacePrimaryIndex::primary_from_table_files_path(path.clone()).await?,
secondary_indexes: SpaceSecondaryIndexes::from_table_files_path(path).await?,
phantom_data: PhantomData,
})
}
Expand All @@ -82,39 +84,48 @@ impl<
PrimaryKeyGenState,
>
where
PrimaryKey: Ord + TablePrimaryKey,
PrimaryKey: Ord + TablePrimaryKey + Send,
<PrimaryKey as TablePrimaryKey>::Generator: PrimaryKeyGeneratorState,
SpaceData: SpaceDataOps<PrimaryKeyGenState>,
SpacePrimaryIndex: SpaceIndexOps<PrimaryKey>,
SpaceSecondaryIndexes: SpaceSecondaryIndexOps<SecondaryIndexEvents>,
SpaceData: SpaceDataOps<PrimaryKeyGenState> + Send,
SpacePrimaryIndex: SpaceIndexOps<PrimaryKey> + Send,
SpaceSecondaryIndexes: SpaceSecondaryIndexOps<SecondaryIndexEvents> + Send,
SecondaryIndexEvents: Send,
PrimaryKeyGenState: Send,
{
fn apply_operation(
async fn apply_operation(
&mut self,
op: Operation<PrimaryKeyGenState, PrimaryKey, SecondaryIndexEvents>,
) -> eyre::Result<()> {
match op {
Operation::Insert(insert) => {
self.data.save_data(insert.link, insert.bytes.as_ref())?;
self.data
.save_data(insert.link, insert.bytes.as_ref())
.await?;
for event in insert.primary_key_events {
self.primary_index.process_change_event(event)?;
self.primary_index.process_change_event(event).await?;
}
let info = self.data.get_mut_info();
info.inner.pk_gen_state = insert.pk_gen_state;
self.data.save_info()?;
self.data.save_info().await?;
self.secondary_indexes
.process_change_events(insert.secondary_keys_events)
.await
}
Operation::Update(update) => {
self.data.save_data(update.link, update.bytes.as_ref())?;
self.data
.save_data(update.link, update.bytes.as_ref())
.await?;
self.secondary_indexes
.process_change_events(update.secondary_keys_events)
.await
}
Operation::Delete(delete) => {
for event in delete.primary_key_events {
self.primary_index.process_change_event(event)?;
self.primary_index.process_change_event(event).await?;
}
self.secondary_indexes
.process_change_events(delete.secondary_keys_events)
.await
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ pub use space::{
map_index_pages_to_toc_and_general, IndexTableOfContents, SpaceData, SpaceDataOps, SpaceIndex,
SpaceIndexOps, SpaceSecondaryIndexOps,
};
use std::future::Future;
pub use task::PersistenceTask;

pub trait PersistenceEngineOps<PrimaryKeyGenState, PrimaryKey, SecondaryIndexEvents> {
fn apply_operation(
&mut self,
op: Operation<PrimaryKeyGenState, PrimaryKey, SecondaryIndexEvents>,
) -> eyre::Result<()>;
) -> impl Future<Output = eyre::Result<()>> + Send;
}
Loading
Loading