Skip to content

Commit

Permalink
Apply new round of PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Apr 15, 2024
1 parent d5bde06 commit d68309e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ impl FileBackedIndex {
&self.metadata
}

/// Replace the search settings in the index config, returning whether a mutation occurred.
/// Replaces the search settings in the index config, returning whether a mutation occurred.
pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool {
let is_mutation = self.metadata.index_config.search_settings != search_settings;
self.metadata.index_config.search_settings = search_settings;
is_mutation
}

/// Replace the retention policy in the index config, returning whether a mutation occurred.
/// Replaces the retention policy in the index config, returning whether a mutation occurred.
pub fn set_retention_policy(&mut self, retention_policy_opt: Option<RetentionPolicy>) -> bool {
let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt;
self.metadata.index_config.retention_policy_opt = retention_policy_opt;
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,19 @@ impl CreateIndexResponseExt for CreateIndexResponse {

/// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload.
pub trait UpdateIndexRequestExt {
/// Updates a new [`UpdateIndexRequest`] from an [`IndexConfig`].
/// Creates a new [`UpdateIndexRequest`] from the different updated fields.
fn try_from_updates(
index_uid: impl Into<IndexUid>,
search_settings: &SearchSettings,
retention_policy_opt: &Option<RetentionPolicy>,
) -> MetastoreResult<UpdateIndexRequest>;

/// Deserializes the `index_config_json` field of a [`UpdateIndexRequest`] into an
/// [`IndexConfig`].
/// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a
/// [`SearchSettings`] object.
fn deserialize_search_settings(&self) -> MetastoreResult<SearchSettings>;

/// Deserializes the `source_configs_json` field of a [`UpdateIndexRequest`] into an
/// `Vec` of [`SourceConfig`].
/// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a
/// [`RetentionPolicy`] object.
fn deserialize_retention_policy(&self) -> MetastoreResult<Option<RetentionPolicy>>;
}

Expand Down
69 changes: 30 additions & 39 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,21 @@ async fn mutate_index_metadata<E, M>(
) -> MetastoreResult<IndexMetadata>
where
MetastoreError: From<E>,
M: FnOnce(IndexMetadata) -> Result<MutationOccurred<IndexMetadata>, E>,
M: FnOnce(&mut IndexMetadata) -> Result<MutationOccurred<()>, E>,
{
let index_id = &index_uid.index_id;
let index_metadata = index_metadata(tx, index_id).await?;
let mut index_metadata = index_metadata(tx, index_id).await?;
if index_metadata.index_uid != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
}));
}

let mutated_index_metadata = match mutate_fn(index_metadata)? {
MutationOccurred::Yes(index_metadata) => index_metadata,
MutationOccurred::No(index_metadata) => return Ok(index_metadata),
};
if let MutationOccurred::No(()) = mutate_fn(&mut index_metadata)? {
return Ok(index_metadata);
}

let index_metadata_json = serde_json::to_string(&mutated_index_metadata).map_err(|error| {
let index_metadata_json = serde_json::to_string(&index_metadata).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "IndexMetadata".to_string(),
message: error.to_string(),
Expand All @@ -333,7 +332,7 @@ where
index_id: index_id.to_string(),
}));
}
Ok(mutated_index_metadata)
Ok(index_metadata)
}

#[async_trait]
Expand Down Expand Up @@ -411,21 +410,17 @@ impl MetastoreService for PostgresqlMetastore {
let search_settings = request.deserialize_search_settings()?;
let index_uid: IndexUid = request.index_uid().clone();
let updated_metadata = run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata::<MetastoreError, _>(
tx,
index_uid,
|mut index_metadata: IndexMetadata| {
if index_metadata.index_config.search_settings != search_settings
|| index_metadata.index_config.retention_policy_opt != retention_policy_opt
{
index_metadata.index_config.search_settings = search_settings;
index_metadata.index_config.retention_policy_opt = retention_policy_opt;
Ok(MutationOccurred::Yes(index_metadata))
} else {
Ok(MutationOccurred::No(index_metadata))
}
},
)
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
if index_metadata.index_config.search_settings != search_settings
|| index_metadata.index_config.retention_policy_opt != retention_policy_opt
{
index_metadata.index_config.search_settings = search_settings;
index_metadata.index_config.retention_policy_opt = retention_policy_opt;
Ok(MutationOccurred::Yes(()))
} else {
Ok(MutationOccurred::No(()))
}
})
.await
})?;
IndexMetadataResponse::try_from_index_metadata(&updated_metadata)
Expand Down Expand Up @@ -969,14 +964,10 @@ impl MetastoreService for PostgresqlMetastore {
let source_config = request.deserialize_source_config()?;
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata::<MetastoreError, _>(
tx,
index_uid,
|mut index_metadata: IndexMetadata| {
index_metadata.add_source(source_config)?;
Ok(MutationOccurred::Yes(index_metadata))
},
)
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
index_metadata.add_source(source_config)?;
Ok(MutationOccurred::Yes(()))
})
.await?;
Ok(())
})?;
Expand All @@ -990,11 +981,11 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid, |mut index_metadata| {
mutate_index_metadata(tx, index_uid, |index_metadata| {
if index_metadata.toggle_source(&request.source_id, request.enable)? {
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
} else {
Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata))
Ok::<_, MetastoreError>(MutationOccurred::No(()))
}
})
.await?;
Expand All @@ -1011,9 +1002,9 @@ impl MetastoreService for PostgresqlMetastore {
let index_uid: IndexUid = request.index_uid().clone();
let source_id = request.source_id.clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid.clone(), |mut index_metadata| {
mutate_index_metadata(tx, index_uid.clone(), |index_metadata| {
index_metadata.delete_source(&source_id)?;
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
})
.await?;
sqlx::query(
Expand All @@ -1040,11 +1031,11 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
run_with_tx!(self.connection_pool, tx, {
mutate_index_metadata(tx, index_uid, |mut index_metadata| {
mutate_index_metadata(tx, index_uid, |index_metadata| {
if index_metadata.checkpoint.reset_source(&request.source_id) {
Ok::<_, MetastoreError>(MutationOccurred::Yes(index_metadata))
Ok::<_, MetastoreError>(MutationOccurred::Yes(()))
} else {
Ok::<_, MetastoreError>(MutationOccurred::No(index_metadata))
Ok::<_, MetastoreError>(MutationOccurred::No(()))
}
})
.await?;
Expand Down

0 comments on commit d68309e

Please sign in to comment.