Skip to content

Commit

Permalink
Refactor mutate_index_metadata to use MutationOccured<IndexMetadata>
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Apr 15, 2024
1 parent 41af001 commit d5bde06
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ impl FileBackedIndex {
}

/// Deletes the source. Returns whether a mutation occurred.
pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<bool> {
pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> {
self.metadata.delete_source(source_id)
}

Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,8 @@ impl MetastoreService for FileBackedMetastore {
let index_uid = request.index_uid();

self.mutate(index_uid, |index| {
index
.delete_source(&request.source_id)
.map(MutationOccurred::from)
index.delete_source(&request.source_id)?;
Ok(MutationOccurred::Yes(()))
})
.await?;
Ok(EmptyResponse {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ impl IndexMetadata {
Ok(mutation_occurred)
}

/// Deletes a source from the index. Returns whether the index was modified (true).
pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<bool> {
/// Deletes a source from the index.
pub(crate) fn delete_source(&mut self, source_id: &str) -> MetastoreResult<()> {
self.sources.remove(source_id).ok_or_else(|| {
MetastoreError::NotFound(EntityKind::Source {
index_id: self.index_id().to_string(),
source_id: source_id.to_string(),
})
})?;
self.checkpoint.remove_source(source_id);
Ok(true)
Ok(())
}
}

Expand Down
70 changes: 38 additions & 32 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use super::utils::{append_query_filters, establish_connection};
use crate::checkpoint::{
IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta,
};
use crate::file_backed::MutationOccurred;
use crate::metastore::postgres::utils::split_maturity_timestamp;
use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE};
use crate::{
Expand Down Expand Up @@ -288,26 +289,29 @@ macro_rules! run_with_tx {
}};
}

async fn mutate_index_metadata<E, M: FnOnce(&mut IndexMetadata) -> Result<bool, E>>(
async fn mutate_index_metadata<E, M>(
tx: &mut Transaction<'_, Postgres>,
index_uid: IndexUid,
mutate_fn: M,
) -> MetastoreResult<bool>
) -> MetastoreResult<IndexMetadata>
where
MetastoreError: From<E>,
M: FnOnce(IndexMetadata) -> Result<MutationOccurred<IndexMetadata>, E>,
{
let index_id = &index_uid.index_id;
let mut index_metadata = index_metadata(tx, index_id).await?;
let index_metadata = index_metadata(tx, index_id).await?;
if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
}));
}
let mutation_occurred = mutate_fn(&mut index_metadata)?;
if !mutation_occurred {
return Ok(mutation_occurred);
}
let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| {

let mutated_index_metadata = match mutate_fn(index_metadata)? {
MutationOccurred::Yes(index_metadata) => index_metadata,
MutationOccurred::No(index_metadata) => return Ok(index_metadata),
};

let index_metadata_json = serde_json::to_string(&mutated_index_metadata).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "IndexMetadata".to_string(),
message: error.to_string(),
Expand All @@ -329,7 +333,7 @@ where
index_id: index_id.to_string(),
}));
}
Ok(mutation_occurred)
Ok(mutated_index_metadata)
}

#[async_trait]
Expand Down Expand Up @@ -406,32 +410,25 @@ impl MetastoreService for PostgresqlMetastore {
let retention_policy_opt = request.deserialize_retention_policy()?;
let search_settings = request.deserialize_search_settings()?;
let index_uid: IndexUid = request.index_uid().clone();
let mut mutated_metadata_opt = None;
let mutated_metadata_ref = &mut mutated_metadata_opt;
run_with_tx!(self.connection_pool, tx, {
let updated_metadata = run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata::<MetastoreError, _>(
tx,
index_uid,
|index_metadata: &mut IndexMetadata| {
let mutated = if index_metadata.index_config.search_settings != search_settings
|mut index_metadata: IndexMetadata| {
if index_metadata.index_config.search_settings != search_settings
|| index_metadata.index_config.retention_policy_opt != retention_policy_opt
{
index_metadata.index_config.search_settings = search_settings;
index_metadata.index_config.retention_policy_opt = retention_policy_opt;
true
Ok(MutationOccurred::Yes(index_metadata))
} else {
false
};
*mutated_metadata_ref = Some(index_metadata.clone());
Ok(mutated)
Ok(MutationOccurred::No(index_metadata))
}
},
)
.await?;
Ok(())
.await
})?;
let mutated_metadata =
mutated_metadata_opt.expect("Mutated IndexMetadata should be set by transaction");
IndexMetadataResponse::try_from_index_metadata(&mutated_metadata)
IndexMetadataResponse::try_from_index_metadata(&updated_metadata)
}

#[instrument(skip_all, fields(index_id=%request.index_uid()))]
Expand Down Expand Up @@ -975,9 +972,9 @@ impl MetastoreService for PostgresqlMetastore {
mutate_index_metadata::<MetastoreError, _>(
tx,
index_uid,
|index_metadata: &mut IndexMetadata| {
|mut index_metadata: IndexMetadata| {
index_metadata.add_source(source_config)?;
Ok(true)
Ok(MutationOccurred::Yes(index_metadata))
},
)
.await?;
Expand All @@ -993,8 +990,12 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid, |index_metadata| {
index_metadata.toggle_source(&request.source_id, request.enable)
mutate_index_metadata(tx, index_uid, |mut index_metadata| {
if index_metadata.toggle_source(&request.source_id, request.enable)? {
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
} else {
Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata))
}
})
.await?;
Ok(())
Expand All @@ -1010,8 +1011,9 @@ impl MetastoreService for PostgresqlMetastore {
let index_uid: IndexUid = request.index_uid().clone();
let source_id = request.source_id.clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid.clone(), |index_metadata| {
index_metadata.delete_source(&source_id)
mutate_index_metadata(tx, index_uid.clone(), |mut index_metadata| {
index_metadata.delete_source(&source_id)?;
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
})
.await?;
sqlx::query(
Expand All @@ -1038,8 +1040,12 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid, |index_metadata| {
Ok::<_, MetastoreError>(index_metadata.checkpoint.reset_source(&request.source_id))
mutate_index_metadata(tx, index_uid, |mut index_metadata| {
if index_metadata.checkpoint.reset_source(&request.source_id) {
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
} else {
Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata))
}
})
.await?;
Ok(())
Expand Down

0 comments on commit d5bde06

Please sign in to comment.