From abde4c65b7a952a43d0e2a2009480bb87ae1cd04 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 17 Apr 2024 10:40:59 +0000 Subject: [PATCH 1/4] Add updates to indexing settings and split endpoint --- docs/reference/cli.md | 54 +++- docs/reference/rest-api.md | 1 + quickwit/quickwit-cli/src/index/update.rs | 297 ++++++++++-------- quickwit/quickwit-cli/tests/cli.rs | 36 +-- quickwit/quickwit-cli/tests/helpers.rs | 6 + .../quickwit-config/src/index_config/mod.rs | 8 + quickwit/quickwit-config/src/lib.rs | 26 +- .../quickwit-control-plane/src/model/mod.rs | 79 +++-- .../src/tests/index_update_tests.rs | 16 +- .../file_backed/file_backed_index/mod.rs | 11 +- .../src/metastore/file_backed/mod.rs | 14 +- .../quickwit-metastore/src/metastore/mod.rs | 96 ++++-- .../src/metastore/postgres/metastore.rs | 32 +- .../quickwit-metastore/src/tests/index.rs | 159 +++++++--- quickwit/quickwit-metastore/src/tests/mod.rs | 16 +- .../protos/quickwit/metastore.proto | 10 +- .../codegen/quickwit/quickwit.metastore.rs | 37 ++- .../quickwit-rest-client/src/rest_client.rs | 42 ++- quickwit/quickwit-serve/src/index_api/mod.rs | 2 +- .../src/index_api/rest_handler.rs | 260 ++++++++++++--- quickwit/quickwit-serve/src/lib.rs | 2 +- 21 files changed, 822 insertions(+), 382 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 651e162013d..2ed3ad0253f 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -183,37 +183,46 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ### index update `quickwit index update [args]` + +*Synopsis* + +```bash +quickwit index update + --index +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | #### index update search-settings -Updates default search settings. +Updates search settings. `quickwit index update search-settings [args]` *Synopsis* ```bash quickwit index update search-settings - --index - --default-search-fields + --config-file ``` *Options* | Option | Description | |-----------------|-------------| -| `--index` | ID of the target index | -| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | +| `--config-file` | Location of a json, yaml or toml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | #### index update retention-policy -Configure or disable the retention policy. +Updates or disables the retention policy. `quickwit index update retention-policy [args]` *Synopsis* ```bash quickwit index update retention-policy - --index - [--period ] - [--schedule ] + [--config-file ] [--disable] ``` @@ -221,10 +230,25 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| -| `--index` | ID of the target index | -| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | -| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | -| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | +| `--config-file` | Location of a json, yaml or toml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | +| `--disable` | Disables the retention policy. Old indexed data will not be cleaned up anymore. | +#### index update indexing-settings + +Updates indexing settings. +`quickwit index update indexing-settings [args]` + +*Synopsis* + +```bash +quickwit index update indexing-settings + --config-file +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--config-file` | Location of a json, yaml or toml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -368,6 +392,7 @@ quickwit index ingest [--input-path ] [--batch-size-limit ] [--wait] + [--v2] [--force] [--commit-timeout ] ``` @@ -380,8 +405,9 @@ quickwit index ingest | `--input-path` | Location of the input file. | | `--batch-size-limit` | Size limit of each submitted document batch. | | `--wait` | Wait for all documents to be commited and available for search before exiting | +| `--v2` | Ingest v2 (experimental! Do not use me.) | | `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting | -| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation. | +| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation. | *Examples* diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 0408851d73d..707518cc0f2 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -350,6 +350,7 @@ Calling the update endpoint with the following payload will remove the current r } } ``` +::: #### Response diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index c79f30357ea..e3dd709c7c8 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -17,71 +17,82 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::str::FromStr; + use anyhow::{bail, Context}; use clap::{arg, ArgMatches, Command}; use colored::Colorize; -use quickwit_config::{RetentionPolicy, SearchSettings}; -use quickwit_serve::IndexUpdates; +use quickwit_common::uri::Uri; +use quickwit_config::ConfigFormat; +use quickwit_proto::bytes::Bytes; +use quickwit_rest_client::rest_client::UpdateConfigField; +use quickwit_storage::{load_file, StorageResolver}; use tracing::debug; -use crate::checklist::GREEN_COLOR; +use crate::checklist::{BLUE_COLOR, GREEN_COLOR}; use crate::ClientArgs; pub fn build_index_update_command() -> Command { Command::new("update") + .args(&[ + arg!(--index "ID of the target index") + .required(true), + ]) .subcommand_required(true) .subcommand( Command::new("search-settings") - .about("Updates default search settings.") + .about("Updates search settings.") .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") - .display_order(2) - .num_args(0..) + arg!(--"config-file" "Location of a json, yaml or toml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") .required(true), ])) .subcommand( Command::new("retention-policy") - .about("Configures or disables the retention policy.") + .about("Updates or disables the retention policy.") .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") - .display_order(2) + arg!(--"config-file" "Location of a json, yaml or toml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") .required(false), - arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") - .display_order(3) - .required(false), - arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") - .display_order(4) + arg!(--disable "Disables the retention policy. Old indexed data will not be cleaned up anymore.") .required(false), ]) ) + .subcommand( + Command::new("indexing-settings") + .about("Updates indexing settings.") + .args(&[ + arg!(--"config-file" "Location of a json, yaml or toml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") + .required(true), + ]) + + ) } #[derive(Debug, Eq, PartialEq)] -pub struct RetentionPolicyArgs { +pub struct OptionalFieldArgs { pub client_args: ClientArgs, pub index_id: String, - pub disable: bool, - pub period: Option, - pub schedule: Option, + pub config_file_opt: Option, } #[derive(Debug, Eq, PartialEq)] -pub struct SearchSettingsArgs { +pub struct RequiredFieldArgs { pub client_args: ClientArgs, pub index_id: String, - pub default_search_fields: Vec, + pub config_file: Uri, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct IndexingSettingsArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub config_file: Uri, } #[derive(Debug, Eq, PartialEq)] pub enum IndexUpdateCliCommand { - RetentionPolicy(RetentionPolicyArgs), - SearchSettings(SearchSettingsArgs), + RetentionPolicy(OptionalFieldArgs), + SearchSettings(RequiredFieldArgs), + IndexingSettings(RequiredFieldArgs), } impl IndexUpdateCliCommand { @@ -89,45 +100,57 @@ impl IndexUpdateCliCommand { let (subcommand, submatches) = matches .remove_subcommand() .context("failed to parse index update subcommand")?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); match subcommand.as_str() { - "retention-policy" => Self::parse_update_retention_policy_args(submatches), - "search-settings" => Self::parse_update_search_settings_args(submatches), + "retention-policy" => Self::parse_update_optional_field_args(submatches, index_id), + "search-settings" => Self::parse_update_required_field_args(submatches, index_id), + "indexing-settings" => Self::parse_update_required_field_args(submatches, index_id), _ => bail!("unknown index update subcommand `{subcommand}`"), } } - fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { + /// Parse args for optional fields (i.e retention policy). + fn parse_update_optional_field_args( + mut matches: ArgMatches, + index_id: String, + ) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); + + let config_file_opt = matches + .remove_one::("config-file") + .map(|uri| Uri::from_str(&uri)) + .transpose()?; let disable = matches.get_flag("disable"); - let period = matches.remove_one::("period"); - let schedule = matches.remove_one::("schedule"); - Ok(Self::RetentionPolicy(RetentionPolicyArgs { + if !disable && config_file_opt.is_none() { + bail!("either `--config-file` or `--disable` must be specified"); + } + if disable && config_file_opt.is_some() { + bail!("both `--config-file` and `--disable` cannot be specified"); + } + Ok(Self::RetentionPolicy(OptionalFieldArgs { client_args, index_id, - disable, - period, - schedule, + config_file_opt, })) } - fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { + /// Parse args for required fields (i.e. search and indexing settings). + fn parse_update_required_field_args( + mut matches: ArgMatches, + index_id: String, + ) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let default_search_fields = matches - .remove_many::("default-search-fields") - .map(|values| values.collect()) - // --default-search-fields should be made optional if other fields - // are added to SearchSettings - .expect("`default-search-fields` should be a required arg."); - Ok(Self::SearchSettings(SearchSettingsArgs { + + let config_file = matches + .remove_one::("config-file") + .map(|uri| Uri::from_str(&uri)) + .expect("`config-file` should be a required arg.")?; + Ok(Self::SearchSettings(RequiredFieldArgs { client_args, index_id, - default_search_fields, + config_file, })) } @@ -135,87 +158,92 @@ impl IndexUpdateCliCommand { match self { Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, Self::SearchSettings(args) => update_search_settings_cli(args).await, + Self::IndexingSettings(args) => update_indexing_settings_cli(args).await, } } } -pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-retention-policy"); - println!("❯ Updating index retention policy..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let new_retention_policy_opt = match ( - args.disable, - args.period, - args.schedule, - metadata.index_config.retention_policy_opt, - ) { - (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { - bail!("`--period` and `--schedule` cannot be used together with `--disable`") - } - (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), - (false, None, Some(_), None) => { - bail!("`--period` is required when creating a retention policy") - } - (true, None, None, _) => None, - (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { - retention_period: policy.retention_period, - evaluation_schedule: schedule, - }), - (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), - }), - (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), - }), - }; - if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { - println!( - "New retention policy: {}", - serde_json::to_string(&new_retention_policy)? - ); - } else { - println!("Disable retention policy."); - } - qw_client +async fn update_from_file( + client_args: ClientArgs, + index_id: &str, + config_file: &Uri, + field: UpdateConfigField, +) -> anyhow::Result<()> { + let storage_resolver = StorageResolver::unconfigured(); + let content = load_file(&storage_resolver, config_file).await?; + client_args + .client() .indexes() .update( + index_id, + field, + Bytes::from(content.as_slice().to_owned()), + ConfigFormat::sniff_from_uri(config_file)?, + ) + .await?; + Ok(()) +} + +pub async fn update_retention_policy_cli(args: OptionalFieldArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-retention-policy"); + println!("❯ Updating index retention policy..."); + if let Some(uri) = args.config_file_opt { + update_from_file( + args.client_args, &args.index_id, - IndexUpdates { - retention_policy_opt: new_retention_policy_opt, - search_settings: metadata.index_config.search_settings, - }, + &uri, + UpdateConfigField::RetentionPolicy, ) .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + } else { + args.client_args + .client() + .indexes() + .delete_retention_policy(&args.index_id) + .await?; + } + println!( + "{} Index retention policy successfully updated.", + "✔".color(GREEN_COLOR) + ); Ok(()) } -pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { +pub async fn update_search_settings_cli(args: RequiredFieldArgs) -> anyhow::Result<()> { debug!(args=?args, "update-index-search-settings"); println!("❯ Updating index search settings..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let search_settings = SearchSettings { - default_search_fields: args.default_search_fields, - }; + update_from_file( + args.client_args, + &args.index_id, + &args.config_file, + UpdateConfigField::SearchSettings, + ) + .await?; println!( - "New search settings: {}", - serde_json::to_string(&search_settings)? + "{} Index search settings successfully updated.", + "✔".color(GREEN_COLOR) + ); + Ok(()) +} + +pub async fn update_indexing_settings_cli(args: RequiredFieldArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-indexing-settings"); + println!("❯ Updating index indexing settings..."); + update_from_file( + args.client_args, + &args.index_id, + &args.config_file, + UpdateConfigField::IndexingSettings, + ) + .await?; + println!( + "{} Index indexing settings successfully updated.", + "✔".color(GREEN_COLOR) + ); + println!( + "{} Restart indexer nodes for the new configuration to take effect.", + "!".color(BLUE_COLOR) ); - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: metadata.index_config.retention_policy_opt, - search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); Ok(()) } @@ -232,25 +260,42 @@ mod test { .try_get_matches_from([ "index", "update", - "retention-policy", "--index", "my-index", - "--period", - "1 day", + "retention-policy", + "--config-file", + "/tmp/hello.json", ]) .unwrap(); let command = CliCommand::parse_cli_args(matches).unwrap(); assert!(matches!( command, CliCommand::Index(IndexCliCommand::Update( - IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { + IndexUpdateCliCommand::RetentionPolicy(OptionalFieldArgs { client_args: _, index_id, - disable: false, - period: Some(period), - schedule: None, + config_file_opt: Some(uri), }) - )) if &index_id == "my-index" && &period == "1 day" + )) if &index_id == "my-index" && uri.as_str() == "file:///tmp/hello.json" )); } + + #[test] + fn test_cmd_invalid_update_subsubcommand() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from([ + "index", + "update", + "--index", + "my-index", + "retention-policy", + "--config-file", + "/tmp/hello.json", + "--disable", + ]) + .unwrap(); + CliCommand::parse_cli_args(matches) + .expect_err("command with both `--config-file` and `--disable` should fail"); + } } diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 848934c6a28..03aa9f92bd2 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -29,7 +29,7 @@ use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; -use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; +use quickwit_cli::index::update::{update_retention_policy_cli, OptionalFieldArgs}; use quickwit_cli::index::{ create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, SearchIndexArgs, @@ -543,17 +543,21 @@ async fn test_cmd_update_index() { .unwrap(); test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); + let test_update_uri = Uri::from_str( + &test_env.resource_files["retention_policy_update"] + .display() + .to_string(), + ) + .unwrap(); - // add a policy - update_retention_policy_cli(RetentionPolicyArgs { + // add a retention policy + update_retention_policy_cli(OptionalFieldArgs { index_id: index_id.clone(), client_args: ClientArgs { cluster_endpoint: test_env.cluster_endpoint.clone(), ..Default::default() }, - disable: false, - period: Some(String::from("1 week")), - schedule: Some(String::from("daily")), + config_file_opt: Some(test_update_uri), }) .await .unwrap(); @@ -566,30 +570,14 @@ async fn test_cmd_update_index() { }) ); - // invalid args - update_retention_policy_cli(RetentionPolicyArgs { - index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: Some(String::from("a week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap_err(); - // remove the policy - update_retention_policy_cli(RetentionPolicyArgs { + update_retention_policy_cli(OptionalFieldArgs { index_id, client_args: ClientArgs { cluster_endpoint: test_env.cluster_endpoint.clone(), ..Default::default() }, - disable: true, - period: None, - schedule: None, + config_file_opt: None, }) .await .unwrap(); diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index d52e48034d9..28fabf89574 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -231,6 +231,11 @@ pub async fn create_test_env( fs::write(&log_docs_path, LOGS_JSON_DOCS)?; let wikipedia_docs_path = resources_dir_path.join("wikis.json"); fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?; + let retention_policy_update_path = resources_dir_path.join("retention_policy_update.json"); + fs::write( + &retention_policy_update_path, + r#"{"period": "1 week", "schedule": "daily"}"#, + )?; let mut resource_files = HashMap::new(); resource_files.insert("config", node_config_path); @@ -238,6 +243,7 @@ pub async fn create_test_env( resource_files.insert("index_config_without_uri", index_config_without_uri_path); resource_files.insert("logs", log_docs_path); resource_files.insert("wiki", wikipedia_docs_path); + resource_files.insert("retention_policy_update", retention_policy_update_path); let config_uri = Uri::from_str(&format!("file://{}", resource_files["config"].display())).unwrap(); diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 2aa295e49b6..c23832c3fab 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -519,6 +519,14 @@ impl TestableForRegression for IndexConfig { } } +/// Represents an update to one of the updatable index configuration field. +#[derive(Clone, Debug)] +pub enum IndexUpdate { + SearchSettings(SearchSettings), + IndexingSettings(IndexingSettings), + RetentionPolicy(Option), +} + /// Builds and returns the doc mapper associated with index. pub fn build_doc_mapper( doc_mapping: &DocMapping, diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 3b458819922..fab8309c41e 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -46,7 +46,7 @@ pub use cluster_config::ClusterConfig; // See #2048 use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ - build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, + build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, IndexUpdate, IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, }; use serde::de::DeserializeOwned; @@ -209,10 +209,11 @@ impl ConfigFormat { ConfigFormat::Json => { let mut json_value: JsonValue = serde_json::from_reader(StripComments::new(payload))?; - let version_value = json_value.get_mut("version").context("missing version")?; - if let Some(version_number) = version_value.as_u64() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); - *version_value = JsonValue::String(version_number.to_string()); + if let Some(version_value) = json_value.get_mut("version") { + if let Some(version_number) = version_value.as_u64() { + warn!(version_value=?version_value, "`version` is supposed to be a string"); + *version_value = JsonValue::String(version_number.to_string()); + } } serde_json::from_value(json_value).context("failed to parse JSON file") } @@ -221,16 +222,13 @@ impl ConfigFormat { .context("configuration file contains invalid UTF-8 characters")?; let mut toml_value: toml::Value = toml::from_str(payload_str).context("failed to parse TOML file")?; - let version_value = toml_value.get_mut("version").context("missing version")?; - if let Some(version_number) = version_value.as_integer() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); - *version_value = toml::Value::String(version_number.to_string()); - let reserialized = toml::to_string(version_value) - .context("failed to reserialize toml config")?; - toml::from_str(&reserialized).context("failed to parse TOML file") - } else { - toml::from_str(payload_str).context("failed to parse TOML file") + if let Some(version_value) = toml_value.get_mut("version") { + if let Some(version_number) = version_value.as_integer() { + warn!(version_value=?version_value, "`version` is supposed to be a string"); + *version_value = toml::Value::String(version_number.to_string()); + } } + toml_value.try_into().context("failed to parse TOML file") } ConfigFormat::Yaml => { serde_yaml::from_slice(payload).context("failed to parse YAML file") diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index af64cc9c479..6528dde3d78 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -20,7 +20,8 @@ mod shard_table; use std::borrow::Cow; -use std::collections::BTreeSet; +use std::collections::hash_map::Entry; +use std::collections::{BTreeSet, HashMap}; use std::mem; use std::ops::Deref; use std::time::Instant; @@ -54,7 +55,7 @@ use tracing::{info, instrument, warn}; #[derive(Default, Debug)] pub(crate) struct ControlPlaneModel { index_uid_table: FnvHashMap, - index_table: FnvHashMap, + index_source_table: FnvHashMap>, shard_table: ShardTable, } @@ -65,7 +66,7 @@ impl ControlPlaneModel { } pub fn num_indexes(&self) -> usize { - self.index_table.len() + self.index_source_table.len() } pub fn num_sources(&self) -> usize { @@ -95,7 +96,7 @@ impl ControlPlaneModel { .await?; let num_indexes = indexes_metadata.len(); - self.index_table.reserve(num_indexes); + self.index_source_table.reserve(num_indexes); for index_metadata in indexes_metadata { self.add_index(index_metadata); @@ -105,13 +106,13 @@ impl ControlPlaneModel { let mut next_list_shards_request = metastore::ListShardsRequest::default(); - for (idx, index_metadata) in self.index_table.values().enumerate() { - for source_config in index_metadata.sources.values() { + for (idx, (index_uid, index_metadata)) in self.index_source_table.iter().enumerate() { + for source_config in index_metadata.values() { num_sources += 1; if source_config.source_type() == SourceType::IngestV2 { let request = ListShardsSubrequest { - index_uid: index_metadata.index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_config.source_id.clone(), shard_state: None, }; @@ -155,24 +156,23 @@ impl ControlPlaneModel { fn update_metrics(&self) { crate::metrics::CONTROL_PLANE_METRICS .indexes_total - .set(self.index_table.len() as i64); + .set(self.index_source_table.len() as i64); } pub(crate) fn source_configs(&self) -> impl Iterator + '_ { - self.index_table.values().flat_map(|index_metadata| { - index_metadata - .sources - .iter() - .map(move |(source_id, source_config)| { + self.index_source_table + .iter() + .flat_map(|(index_uid, sources)| { + sources.iter().map(move |(source_id, source_config)| { ( SourceUid { - index_uid: index_metadata.index_uid.clone(), + index_uid: index_uid.clone(), source_id: source_id.clone(), }, source_config, ) }) - }) + }) } pub(crate) fn add_index(&mut self, index_metadata: IndexMetadata) { @@ -186,12 +186,13 @@ impl ControlPlaneModel { self.shard_table.add_source(&index_uid, source_id); } } - self.index_table.insert(index_uid, index_metadata); + self.index_source_table + .insert(index_uid, index_metadata.sources); self.update_metrics(); } pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { - self.index_table.remove(index_uid); + self.index_source_table.remove(index_uid); self.index_uid_table.remove(&index_uid.index_id); self.shard_table.delete_index(&index_uid.index_id); self.update_metrics(); @@ -204,13 +205,21 @@ impl ControlPlaneModel { index_uid: &IndexUid, source_config: SourceConfig, ) -> ControlPlaneResult<()> { - let index_metadata = self.index_table.get_mut(index_uid).ok_or_else(|| { + let index_sources = self.index_source_table.get_mut(index_uid).ok_or_else(|| { MetastoreError::NotFound(EntityKind::Index { index_id: index_uid.to_string(), }) })?; - index_metadata.add_source(source_config.clone())?; - + match index_sources.entry(source_config.source_id.clone()) { + Entry::Occupied(_) => Err(MetastoreError::AlreadyExists(EntityKind::Source { + index_id: index_uid.index_id.clone(), + source_id: source_config.source_id.clone(), + })), + Entry::Vacant(entry) => { + entry.insert(source_config.clone()); + Ok(()) + } + }?; if source_config.source_type() == SourceType::IngestV2 { self.shard_table .add_source(index_uid, &source_config.source_id); @@ -223,15 +232,11 @@ impl ControlPlaneModel { self.shard_table .delete_source(&source_uid.index_uid, &source_uid.source_id); // Remove source from index metadata. - let Some(index_metadata) = self.index_table.get_mut(&source_uid.index_uid) else { + let Some(index_sources) = self.index_source_table.get_mut(&source_uid.index_uid) else { warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: index not found"); return; }; - if index_metadata - .sources - .remove(&source_uid.source_id) - .is_none() - { + if index_sources.remove(&source_uid.source_id).is_none() { warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: source not found"); }; } @@ -244,10 +249,10 @@ impl ControlPlaneModel { source_id: &SourceId, enable: bool, ) -> anyhow::Result { - let Some(index_model) = self.index_table.get_mut(index_uid) else { + let Some(index_sources) = self.index_source_table.get_mut(index_uid) else { bail!("index `{}` not found", index_uid.index_id); }; - let Some(source_config) = index_model.sources.get_mut(source_id) else { + let Some(source_config) = index_sources.get_mut(source_id) else { bail!("source `{source_id}` not found"); }; let has_changed = source_config.enabled != enable; @@ -457,7 +462,7 @@ mod tests { .await .unwrap(); - assert_eq!(model.index_table.len(), 3); + assert_eq!(model.index_source_table.len(), 3); assert_eq!(model.index_uid("test-index-0").unwrap(), index_uid); assert_eq!(model.index_uid("test-index-1").unwrap(), index_uid2); assert_eq!(model.index_uid("test-index-2").unwrap(), index_uid3); @@ -497,8 +502,11 @@ mod tests { let index_uid = index_metadata.index_uid.clone(); model.add_index(index_metadata.clone()); - assert_eq!(model.index_table.len(), 1); - assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); + assert_eq!(model.index_source_table.len(), 1); + assert_eq!( + model.index_source_table.get(&index_uid).unwrap(), + &index_metadata.sources + ); assert_eq!(model.index_uid_table.len(), 1); assert_eq!(model.index_uid("test-index").unwrap(), index_uid); @@ -515,8 +523,11 @@ mod tests { let index_uid = index_metadata.index_uid.clone(); model.add_index(index_metadata.clone()); - assert_eq!(model.index_table.len(), 1); - assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); + assert_eq!(model.index_source_table.len(), 1); + assert_eq!( + model.index_source_table.get(&index_uid).unwrap(), + &index_metadata.sources + ); assert_eq!(model.index_uid_table.len(), 1); assert_eq!(model.index_uid("test-index").unwrap(), index_uid); @@ -545,7 +556,7 @@ mod tests { model.delete_index(&index_uid); - assert!(model.index_table.is_empty()); + assert!(model.index_source_table.is_empty()); assert!(model.index_uid_table.is_empty()); assert_eq!(model.shard_table.num_sources(), 0); } diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs index 5ddc8a034ec..4725a726036 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -20,10 +20,11 @@ use std::collections::HashSet; use std::time::Duration; +use hyper::body::Bytes; use quickwit_config::service::QuickwitService; -use quickwit_config::SearchSettings; -use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use quickwit_config::ConfigFormat; +use quickwit_rest_client::rest_client::{CommitType, UpdateConfigField}; +use quickwit_serve::SearchRequestQueryString; use serde_json::json; use crate::ingest_json; @@ -121,12 +122,9 @@ async fn test_update_on_multi_nodes_cluster() { .indexes() .update( "my-updatable-index", - IndexUpdates { - search_settings: SearchSettings { - default_search_fields: vec!["title".to_string(), "body".to_string()], - }, - retention_policy_opt: None, - }, + UpdateConfigField::SearchSettings, + Bytes::from(r#"{"default_search_fields":["title", "body"]}"#), + ConfigFormat::Json, ) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 5f7f91c81e0..4ad47813e55 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,9 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID, +}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, @@ -227,6 +229,13 @@ impl FileBackedIndex { is_mutation } + /// Replaces the retention policy in the index config, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, indexing_settings: IndexingSettings) -> bool { + let is_mutation = self.metadata.index_config.indexing_settings != indexing_settings; + self.metadata.index_config.indexing_settings = indexing_settings; + is_mutation + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index a0ec0f51264..b00c20ecef8 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -41,7 +41,7 @@ use async_trait::async_trait; use futures::future::try_join_all; use itertools::Itertools; use quickwit_common::ServiceStream; -use quickwit_config::IndexTemplate; +use quickwit_config::{IndexTemplate, IndexUpdate}; use quickwit_proto::metastore::{ serde_utils, AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, @@ -461,15 +461,17 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let search_settings = request.deserialize_search_settings()?; - let retention_policy_opt = request.deserialize_retention_policy()?; + let update = request.deserialize_update()?; let index_uid = request.index_uid(); let metadata = self .mutate(index_uid, |index| { - let search_settings_mutated = index.set_search_settings(search_settings); - let retention_policy_mutated = index.set_retention_policy(retention_policy_opt); - if search_settings_mutated || retention_policy_mutated { + let mutation_occured = match update { + IndexUpdate::SearchSettings(s) => index.set_search_settings(s), + IndexUpdate::RetentionPolicy(s) => index.set_retention_policy(s), + IndexUpdate::IndexingSettings(s) => index.set_indexing_settings(s), + }; + if mutation_occured { Ok(MutationOccurred::Yes(index.metadata().clone())) } else { Ok(MutationOccurred::No(index.metadata().clone())) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3a231f0029c..fb320c73e7a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,13 +31,14 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; +use quickwit_config::{IndexConfig, IndexUpdate, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, + UpdatedIndexConfig, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -181,51 +182,78 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { - /// Creates a new [`UpdateIndexRequest`] from the different updated fields. - fn try_from_updates( + /// Creates a new [`UpdateIndexRequest`] from an `IndexUpdate`. + fn try_from_update( index_uid: impl Into, - search_settings: &SearchSettings, - retention_policy_opt: &Option, + update: &IndexUpdate, ) -> MetastoreResult; - /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a - /// [`SearchSettings`] object. - fn deserialize_search_settings(&self) -> MetastoreResult; - - /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a - /// [`RetentionPolicy`] object. - fn deserialize_retention_policy(&self) -> MetastoreResult>; + /// Deserializes the `config_json` field of an [`UpdateIndexRequest`] into + /// the appropriate variant of `IndexUpdate`. + fn deserialize_update(&self) -> MetastoreResult; } impl UpdateIndexRequestExt for UpdateIndexRequest { - fn try_from_updates( + fn try_from_update( index_uid: impl Into, - search_settings: &SearchSettings, - retention_policy_opt: &Option, + update: &IndexUpdate, ) -> MetastoreResult { - let search_settings_json = serde_utils::to_json_str(&search_settings)?; - let retention_policy_json = retention_policy_opt - .as_ref() - .map(serde_utils::to_json_str) - .transpose()?; - - let update_request = UpdateIndexRequest { - index_uid: Some(index_uid.into()), - search_settings_json, - retention_policy_json, + let index_uid = Some(index_uid.into()); + let update_request = match update { + IndexUpdate::IndexingSettings(s) => UpdateIndexRequest { + index_uid, + target_config: UpdatedIndexConfig::IndexingSettings.into(), + config_json: Some(serde_utils::to_json_str(s)?), + }, + IndexUpdate::SearchSettings(s) => UpdateIndexRequest { + index_uid, + target_config: UpdatedIndexConfig::SearchSettings.into(), + config_json: Some(serde_utils::to_json_str(s)?), + }, + IndexUpdate::RetentionPolicy(s) => UpdateIndexRequest { + index_uid, + target_config: UpdatedIndexConfig::RetentionPolicy.into(), + config_json: s.as_ref().map(serde_utils::to_json_str).transpose()?, + }, }; - Ok(update_request) - } - fn deserialize_search_settings(&self) -> MetastoreResult { - serde_utils::from_json_str(&self.search_settings_json) + Ok(update_request) } - fn deserialize_retention_policy(&self) -> MetastoreResult> { - self.retention_policy_json - .as_ref() - .map(|policy| serde_utils::from_json_str(policy)) - .transpose() + fn deserialize_update(&self) -> MetastoreResult { + let config_ref_opt = self.config_json.as_ref(); + match self.target_config { + x if x == UpdatedIndexConfig::IndexingSettings as i32 => { + let indexing_settings = config_ref_opt + .map(|json| serde_utils::from_json_str(json)) + .transpose()? + .ok_or(MetastoreError::JsonDeserializeError { + struct_name: "IndexingSettings".to_owned(), + message: "Should not be empty".to_owned(), + })?; + Ok(IndexUpdate::IndexingSettings(indexing_settings)) + } + x if x == UpdatedIndexConfig::SearchSettings as i32 => { + let search_settings = config_ref_opt + .map(|json| serde_utils::from_json_str(json)) + .transpose()? + .ok_or(MetastoreError::JsonDeserializeError { + struct_name: "IndexingSettings".to_owned(), + message: "Should not be empty".to_owned(), + })?; + Ok(IndexUpdate::SearchSettings(search_settings)) + } + x if x == UpdatedIndexConfig::RetentionPolicy as i32 => { + let retention_policy = config_ref_opt + .map(|json| serde_utils::from_json_str(json)) + .transpose()?; + Ok(IndexUpdate::RetentionPolicy(retention_policy)) + } + _ => Err(MetastoreError::JsonDeserializeError { + struct_name: "IndexUpdate".to_owned(), + message: "Unexpected target config".to_owned(), + }), + } } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 9202c24cbea..fdb4d1de503 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -25,8 +25,8 @@ use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; use quickwit_config::{ - validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, - INGEST_V2_SOURCE_ID, + validate_index_id_pattern, IndexTemplate, IndexTemplateId, IndexUpdate, + PostgresMetastoreConfig, INGEST_V2_SOURCE_ID, }; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ @@ -406,20 +406,30 @@ impl MetastoreService for PostgresqlMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let retention_policy_opt = request.deserialize_retention_policy()?; - let search_settings = request.deserialize_search_settings()?; + let update = request.deserialize_update()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_metadata = run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata::(tx, index_uid, |index_metadata| { - if index_metadata.index_config.search_settings != search_settings - || index_metadata.index_config.retention_policy_opt != retention_policy_opt + mutate_index_metadata::(tx, index_uid, |index_metadata| match update + { + IndexUpdate::SearchSettings(s) + if index_metadata.index_config.search_settings != s => { - index_metadata.index_config.search_settings = search_settings; - index_metadata.index_config.retention_policy_opt = retention_policy_opt; + index_metadata.index_config.search_settings = s; + Ok(MutationOccurred::Yes(())) + } + IndexUpdate::IndexingSettings(s) + if index_metadata.index_config.indexing_settings != s => + { + index_metadata.index_config.indexing_settings = s; + Ok(MutationOccurred::Yes(())) + } + IndexUpdate::RetentionPolicy(s) + if index_metadata.index_config.retention_policy_opt != s => + { + index_metadata.index_config.retention_policy_opt = s; Ok(MutationOccurred::Yes(())) - } else { - Ok(MutationOccurred::No(())) } + _ => Ok(MutationOccurred::No(())), }) .await })?; diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index a75ff115297..7c138016121 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,13 +25,12 @@ // - list_indexes // - delete_index -use std::collections::BTreeSet; - use quickwit_common::rand::append_random_suffix; +use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; use quickwit_config::{ - IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, + IndexConfig, IndexUpdate, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, + CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, }; -use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, @@ -84,9 +83,9 @@ pub async fn test_metastore_create_index< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_update_index< +async fn setup_metastore_for_update< MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, ->() { +>() -> (MetastoreToTest, IndexUid) { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-update-index"); @@ -101,51 +100,28 @@ pub async fn test_metastore_update_index< .index_uid() .clone(); - let index_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) - .await - .unwrap() - .deserialize_index_metadata() - .unwrap(); - - // use all fields that are currently not set as default - let current_defaults = BTreeSet::from_iter( - index_metadata - .index_config - .search_settings - .default_search_fields, - ); - let new_search_setting = SearchSettings { - default_search_fields: index_metadata - .index_config - .doc_mapping - .field_mappings - .iter() - .filter(|f| matches!(f.mapping_type, FieldMappingType::Text(..))) - .filter(|f| !current_defaults.contains(&f.name)) - .map(|f| f.name.clone()) - .collect(), - }; + (metastore, index_uid) +} +pub async fn test_metastore_update_retention_policy< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), evaluation_schedule: String::from("daily"), }); - assert_ne!( - index_metadata.index_config.retention_policy_opt, new_retention_policy_opt, - "original and updated value are the same, test became inefficient" - ); - // run same update twice to check idempotence, then None as a corner case check + // set and unset retention policy multiple times for loop_retention_policy_opt in [ + None, new_retention_policy_opt.clone(), new_retention_policy_opt.clone(), None, ] { - let index_update = UpdateIndexRequest::try_from_updates( + let index_update = UpdateIndexRequest::try_from_update( index_uid.clone(), - &new_search_setting, - &loop_retention_policy_opt, + &IndexUpdate::RetentionPolicy(loop_retention_policy_opt.clone()), ) .unwrap(); let response_metadata = metastore @@ -155,23 +131,118 @@ pub async fn test_metastore_update_index< .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata.index_uid, index_uid); - assert_eq!( - response_metadata.index_config.search_settings, - new_search_setting - ); assert_eq!( response_metadata.index_config.retention_policy_opt, loop_retention_policy_opt ); let updated_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) .await .unwrap() .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata, updated_metadata); } + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_update_search_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; + + for loop_search_settings in [ + vec![], + vec!["body".to_owned()], + vec!["body".to_owned()], + vec!["body".to_owned(), "owner".to_owned()], + vec![], + ] { + let index_update = UpdateIndexRequest::try_from_update( + index_uid.clone(), + &IndexUpdate::SearchSettings(SearchSettings { + default_search_fields: loop_search_settings.clone(), + }), + ) + .unwrap(); + let response_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + response_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + } + cleanup_index(&mut metastore, index_uid).await; +} +pub async fn test_metastore_update_indexing_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; + + for loop_indexing_settings in [ + MergePolicyConfig::Nop, + MergePolicyConfig::Nop, + MergePolicyConfig::StableLog(StableLogMergePolicyConfig { + merge_factor: 5, + ..Default::default() + }), + ] { + let index_update = UpdateIndexRequest::try_from_update( + index_uid.clone(), + &IndexUpdate::IndexingSettings(IndexingSettings { + merge_policy: loop_indexing_settings.clone(), + ..Default::default() + }), + ) + .unwrap(); + let resp_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + resp_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + } cleanup_index(&mut metastore, index_uid).await; } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3caf221a569..61fd9bd8abe 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -187,9 +187,21 @@ macro_rules! metastore_test_suite { } #[tokio::test] - async fn test_metastore_update_index() { + async fn test_metastore_update_retention_policy() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + $crate::tests::index::test_metastore_update_retention_policy::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_search_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_search_settings::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_indexing_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_indexing_settings::<$metastore_type>().await; } #[tokio::test] diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 9c3d7e9a6a2..80d59d3ee64 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -202,10 +202,16 @@ message CreateIndexResponse { string index_metadata_json = 2; } +enum UpdatedIndexConfig { + SEARCH_SETTINGS = 0; + INDEXING_SETTINGS = 1; + RETENTION_POLICY = 2; +} + message UpdateIndexRequest { quickwit.common.IndexUid index_uid = 1; - string search_settings_json = 2; - optional string retention_policy_json = 3; + UpdatedIndexConfig target_config = 2; + optional string config_json = 3; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 2d27a262d23..47ec880c7cc 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -26,10 +26,10 @@ pub struct CreateIndexResponse { pub struct UpdateIndexRequest { #[prost(message, optional, tag = "1")] pub index_uid: ::core::option::Option, - #[prost(string, tag = "2")] - pub search_settings_json: ::prost::alloc::string::String, + #[prost(enumeration = "UpdatedIndexConfig", tag = "2")] + pub target_config: i32, #[prost(string, optional, tag = "3")] - pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, + pub config_json: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -509,6 +509,37 @@ impl SourceType { } } } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum UpdatedIndexConfig { + SearchSettings = 0, + IndexingSettings = 1, + RetentionPolicy = 2, +} +impl UpdatedIndexConfig { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + UpdatedIndexConfig::SearchSettings => "SEARCH_SETTINGS", + UpdatedIndexConfig::IndexingSettings => "INDEXING_SETTINGS", + UpdatedIndexConfig::RetentionPolicy => "RETENTION_POLICY", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SEARCH_SETTINGS" => Some(Self::SearchSettings), + "INDEXING_SETTINGS" => Some(Self::IndexingSettings), + "RETENTION_POLICY" => Some(Self::RetentionPolicy), + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index e90267482a8..971e1e0c546 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,9 +26,7 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ - IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, -}; +use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -326,6 +324,12 @@ pub struct IndexClient<'a> { timeout: Timeout, } +pub enum UpdateConfigField { + RetentionPolicy, + SearchSettings, + IndexingSettings, +} + impl<'a> IndexClient<'a> { fn new(transport: &'a Transport, timeout: Timeout) -> Self { Self { transport, timeout } @@ -357,13 +361,37 @@ impl<'a> IndexClient<'a> { pub async fn update( &self, index_id: &str, - index_updates: IndexUpdates, + config_field: UpdateConfigField, + config: Bytes, + config_format: ConfigFormat, ) -> Result { - let body = Bytes::from(serde_json::to_string(&index_updates)?); - let path = format!("indexes/{index_id}"); + let header_map = header_from_config_format(config_format); + let path = match config_field { + UpdateConfigField::IndexingSettings => "indexing-settings", + UpdateConfigField::SearchSettings => "search-settings", + UpdateConfigField::RetentionPolicy => "retention-policy", + }; + let path = format!("indexes/{index_id}/{path}"); let response = self .transport - .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .send::<()>( + Method::PUT, + &path, + Some(header_map), + None, + Some(config), + self.timeout, + ) + .await?; + let index_metadata = response.deserialize().await?; + Ok(index_metadata) + } + + pub async fn delete_retention_policy(&self, index_id: &str) -> Result { + let path = format!("indexes/{index_id}/retention-policy"); + let response = self + .transport + .send::<()>(Method::DELETE, &path, None, None, None, self.timeout) .await?; let index_metadata = response.deserialize().await?; Ok(index_metadata) diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index ab831526c81..9b0990ce1f8 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,5 +20,5 @@ mod rest_handler; pub use self::rest_handler::{ - index_management_handlers, IndexApi, IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, + index_management_handlers, IndexApi, ListSplitsQueryParams, ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index a64ff918540..cca5bb48211 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -22,9 +22,8 @@ use std::sync::Arc; use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ - load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, - INGEST_API_SOURCE_ID, + load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, IndexUpdate, + NodeConfig, SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; @@ -55,7 +54,10 @@ use crate::with_arg; #[openapi( paths( create_index, - update_index, + update_index_indexing_settings, + update_index_search_settings, + update_index_retention_policy, + delete_index_retention_policy, clear_index, delete_index, list_indexes_metadata, @@ -67,7 +69,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) )] pub struct IndexApi; @@ -90,7 +92,10 @@ pub fn index_management_handlers( get_index_metadata_handler(index_service.metastore()) .or(list_indexes_metadata_handler(index_service.metastore())) .or(create_index_handler(index_service.clone(), node_config)) - .or(update_index_handler(index_service.metastore())) + .or(update_search_settings_handler(index_service.metastore())) + .or(update_indexing_setting_handler(index_service.metastore())) + .or(update_retention_policy_handler(index_service.metastore())) + .or(delete_retention_policy_handler(index_service.metastore())) .or(clear_index_handler(index_service.clone())) .or(delete_index_handler(index_service.clone())) // Splits handlers @@ -526,25 +531,45 @@ async fn create_index( .await } -/// The body of the index update request. All fields will be replaced in the -/// existing configuration. -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility -pub struct IndexUpdates { - pub search_settings: SearchSettings, - #[serde(rename = "retention_policy")] - pub retention_policy_opt: Option, +pub fn pre_update_filter( + metastore: MetastoreServiceClient, + path: &'static str, +) -> impl Filter + + Clone { + warp::path("indexes") + .and(warp::path::param()) + .and(warp::path(path)) + .and(warp::path::end()) + .and(warp::put()) + .and(extract_config_format()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::filters::body::bytes()) + .and(with_arg(metastore)) } -fn update_index_handler( +async fn update_index( + index_id: String, + request: IndexUpdate, + mut metastore: MetastoreServiceClient, +) -> Result { + info!(index_id = %index_id, "update-index"); + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_uid: IndexUid = metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()? + .index_uid; + let update_request = UpdateIndexRequest::try_from_update(index_uid, &request)?; + let update_resp = metastore.update_index(update_request).await?; + Ok(update_resp.deserialize_index_metadata()?) +} + +pub fn update_indexing_setting_handler( metastore: MetastoreServiceClient, ) -> impl Filter + Clone { - warp::path!("indexes" / String) - .and(warp::put()) - .and(json_body()) - .and(with_arg(metastore)) - .then(update_index) - .map(log_failure("failed to update index")) + pre_update_filter(metastore, "indexing-settings") + .then(update_index_indexing_settings) + .map(log_failure("failed to update indexing settings")) .and(extract_format_from_qs()) .map(into_rest_api_response) } @@ -552,40 +577,124 @@ fn update_index_handler( #[utoipa::path( put, tag = "Indexes", - path = "/indexes/{index_id}", - request_body = IndexUpdates, + path = "/indexes/{index_id}/indexing-settings", + request_body = IndexingSettings, responses( - (status = 200, description = "Successfully updated the index configuration.") + (status = 200, description = "Successfully updated the indexing settings.") ), params( ("index_id" = String, Path, description = "The index ID to update."), ) )] -/// Updates an existing index. -/// -/// This endpoint has PUT semantics, which means that all the updatable fields of the index -/// configuration are replaced by the values specified in the request. In particular, omitting an -/// optional field like `retention_policy` will delete the associated configuration. -async fn update_index( +/// For the indexing settings update to take effect, the indexer nodes must be restarted. +pub async fn update_index_indexing_settings( index_id: String, - request: IndexUpdates, - mut metastore: MetastoreServiceClient, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, ) -> Result { - info!(index_id = %index_id, "update-index"); - let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); - let index_uid: IndexUid = metastore - .index_metadata(index_metadata_request) - .await? - .deserialize_index_metadata()? - .index_uid; + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index(index_id, IndexUpdate::IndexingSettings(update), metastore).await +} - let update_request = UpdateIndexRequest::try_from_updates( - index_uid, - &request.search_settings, - &request.retention_policy_opt, - )?; - let update_resp = metastore.update_index(update_request).await?; - Ok(update_resp.deserialize_index_metadata()?) +pub fn update_search_settings_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + pre_update_filter(metastore, "search-settings") + .then(update_index_search_settings) + .map(log_failure("failed to update search settings")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}/search-settings", + request_body = SearchSettings, + responses( + (status = 200, description = "Successfully updated the search settings.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +pub async fn update_index_search_settings( + index_id: String, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, +) -> Result { + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index(index_id, IndexUpdate::SearchSettings(update), metastore).await +} + +pub fn update_retention_policy_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + pre_update_filter(metastore, "retention-policy") + .then(update_index_retention_policy) + .map(log_failure("failed to update retention policy")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}/retention-policy", + request_body = RetentionPolicy, + responses( + (status = 200, description = "Successfully updated the retention policy.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +pub async fn update_index_retention_policy( + index_id: String, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, +) -> Result { + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index(index_id, IndexUpdate::RetentionPolicy(update), metastore).await +} + +pub fn delete_retention_policy_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("indexes" / String / "retention-policy") + .and(warp::delete()) + .and(with_arg(metastore)) + .then(delete_index_retention_policy) + .map(log_failure("failed to delete index retention policy")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + delete, + tag = "Indexes", + path = "/indexes/{index_id}/retention-policy", + responses( + (status = 200, description = "Successfully updated the retention policy.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +pub async fn delete_index_retention_policy( + index_id: String, + metastore: MetastoreServiceClient, +) -> Result { + update_index(index_id, IndexUpdate::RetentionPolicy(None), metastore).await } fn clear_index_handler( @@ -1001,6 +1110,7 @@ mod tests { use assert_json_diff::assert_json_include; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; + use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::{SourceParams, VecSourceParams}; use quickwit_indexing::{mock_split, MockSplitBuilder}; use quickwit_metastore::{metastore_for_test, IndexMetadata, ListSplitsResponseExt}; @@ -1791,7 +1901,7 @@ mod tests { .path("/indexes") .method("POST") .json(&true) - .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["body"]}}"#) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"timestamp_field":"timestamp", "field_mappings":[{"name": "timestamp", "type": "datetime", "fast": true}]},"search_settings":{"default_search_fields":["body"]}}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); @@ -1807,10 +1917,10 @@ mod tests { } { let resp = warp::test::request() - .path("/indexes/hdfs-logs") + .path("/indexes/hdfs-logs/search-settings") .method("PUT") .json(&true) - .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .body(r#"{"default_search_fields":["severity_text","body"]}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); @@ -1824,6 +1934,46 @@ mod tests { }); assert_json_include!(actual: resp_json, expected: expected_response_json); } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs/retention-policy") + .method("PUT") + .json(&true) + .body(r#"{"period":"90 days"}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "retention": { + "period": "90 days" + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs/indexing-settings") + .method("PUT") + .json(&true) + .body(r#"{"merge_policy":{"type":"limit_merge"}}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "indexing_settings": { + "merge_policy": { + "type": "limit_merge" + } + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } // check that the metastore was updated let index_metadata = metastore .index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string())) @@ -1838,6 +1988,18 @@ mod tests { .default_search_fields, ["severity_text", "body"] ); + assert_eq!( + index_metadata + .index_config + .retention_policy_opt + .unwrap() + .retention_period, + "90 days" + ); + assert!(matches!( + index_metadata.index_config.indexing_settings.merge_policy, + MergePolicyConfig::ConstWriteAmplification(_) + )); } #[tokio::test] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5cd5b487a93..c14d6885cfb 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -117,7 +117,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)] From b39c33c107a535dca667718d3d0d1623cb12f8c5 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 17 Apr 2024 13:55:39 +0000 Subject: [PATCH 2/4] Update docs --- docs/reference/cli.md | 6 +- docs/reference/rest-api.md | 62 +++++++++++-------- quickwit/quickwit-cli/src/index/update.rs | 9 ++- .../quickwit-metastore/src/metastore/mod.rs | 2 +- .../src/index_api/rest_handler.rs | 28 ++++++--- 5 files changed, 68 insertions(+), 39 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 2ed3ad0253f..aff917ff6ec 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -212,7 +212,7 @@ quickwit index update search-settings | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json, yaml or toml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | +| `--config-file` | Location of a json or yaml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | #### index update retention-policy Updates or disables the retention policy. @@ -230,7 +230,7 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json, yaml or toml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | +| `--config-file` | Location of a json or yaml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | | `--disable` | Disables the retention policy. Old indexed data will not be cleaned up anymore. | #### index update indexing-settings @@ -248,7 +248,7 @@ quickwit index update indexing-settings | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json, yaml or toml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | +| `--config-file` | Location of a json or yaml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | ### index clear Clears an index: deletes all splits and resets checkpoint. diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 707518cc0f2..3ad8145aeae 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -309,48 +309,60 @@ The response is the index metadata of the created index, and the content type is | `sources` | List of the index sources configurations. | `Array` | -### Update an index (search settings and retention policy only) +### Update an index ``` -PUT api/v1/indexes/ +PUT api/v1/indexes//indexing-settings +PUT api/v1/indexes//search-settings +PUT api/v1/indexes//retention-policy ``` -Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. +These endpoints follows PUT semantics (not PATCH), which means that all the fields of the updated configuration are replaced by the values specified in the request. Values that are not specified will be reset to their defaults. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. +- The search settings update is automatically picked up by the janitor service on its next state refresh. +- The retention policy update is automatically picked up when the next query is executed. +- The indexing settings update is not automatically picked up by the indexer nodes, they need to be manually restarted. -#### PUT payload +#### PUT payloads -| Variable | Type | Description | Default value | -|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| -| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | -| `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | +| Endpoint | Type | Description | +|---------------------|--------------------|------------------------------------------------------------------------------------------| +| `/search-settings` | `SearchSettings` | See [search settings config docs](../configuration/index-config.md#search-settings). | +| `/retention-policy` | `Retention` | See [retention policy config docs](../configuration/index-config.md#retention-policy). | +| `/indexing-settings`| `IndexingSettings` | See [indexing settings config docs](../configuration/index-config.md#indexing-settings). | -**Payload Example** +**Payload Examples** -curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Content-Type: application/json" +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/search-settings --data @search_settings_update.json -H "Content-Type: application/json" -```json title="index_update.json +```json title="search_settings_update.json { - "search_settings": { - "default_search_fields": ["body"] - }, - "retention": { - "period": "3 days", - "schedule": "@daily" - } + "default_search_fields": ["body"] } ``` -:::warning -Calling the update endpoint with the following payload will remove the current retention policy. -```json +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/retention-policy --data @retention_policy_update.json -H "Content-Type: application/json" + +```json title="retention_policy_update.json { - "search_settings": { - "default_search_fields": ["body"] - } + "period": "3 days", + "schedule": "daily" +} +``` + +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/indexing-settings --data @indexing_settings_update.json -H "Content-Type: application/json" + +```json title="indexing_settings_update.json +{ + "merge_policy": { + "type": "limit_merge", + "max_merge_ops": 3, + "merge_factor": 10, + "max_merge_factor": 12 + }, + "commit_timeout_secs": 5 } ``` -::: #### Response diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index e3dd709c7c8..b405009c722 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -42,15 +42,17 @@ pub fn build_index_update_command() -> Command { .subcommand( Command::new("search-settings") .about("Updates search settings.") + .long_about("Updates search settings. The update is automatically picked up when the next query is executed.") .args(&[ - arg!(--"config-file" "Location of a json, yaml or toml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") + arg!(--"config-file" "Location of a json or yaml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") .required(true), ])) .subcommand( Command::new("retention-policy") .about("Updates or disables the retention policy.") + .long_about("Updates or disables the retention policy. The update is automatically picked up by the janitor service on its next state refresh.") .args(&[ - arg!(--"config-file" "Location of a json, yaml or toml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") + arg!(--"config-file" "Location of a json or yaml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") .required(false), arg!(--disable "Disables the retention policy. Old indexed data will not be cleaned up anymore.") .required(false), @@ -59,8 +61,9 @@ pub fn build_index_update_command() -> Command { .subcommand( Command::new("indexing-settings") .about("Updates indexing settings.") + .long_about("Updates indexing settings. The update is not automatically picked up by the indexer nodes, they need to be manually restarted.") .args(&[ - arg!(--"config-file" "Location of a json, yaml or toml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") + arg!(--"config-file" "Location of a json or yaml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") .required(true), ]) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index fb320c73e7a..8bf69b8460a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -182,7 +182,7 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { - /// Creates a new [`UpdateIndexRequest`] from an `IndexUpdate`. + /// Creates a new [`UpdateIndexRequest`] from an [`IndexUpdate`]. fn try_from_update( index_uid: impl Into, update: &IndexUpdate, diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index cca5bb48211..135a5ef00ec 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -586,7 +586,8 @@ pub fn update_indexing_setting_handler( ("index_id" = String, Path, description = "The index ID to update."), ) )] -/// For the indexing settings update to take effect, the indexer nodes must be restarted. +/// The update is not automatically picked up by the indexer nodes, they need to be manually +/// restarted. pub async fn update_index_indexing_settings( index_id: String, config_format: ConfigFormat, @@ -621,6 +622,7 @@ pub fn update_search_settings_handler( ("index_id" = String, Path, description = "The index ID to update."), ) )] +/// The update is automatically picked up when the next query is executed. pub async fn update_index_search_settings( index_id: String, config_format: ConfigFormat, @@ -655,6 +657,7 @@ pub fn update_retention_policy_handler( ("index_id" = String, Path, description = "The index ID to update."), ) )] +/// The update is automatically picked up by the janitor service on its next state refresh. pub async fn update_index_retention_policy( index_id: String, config_format: ConfigFormat, @@ -664,7 +667,12 @@ pub async fn update_index_retention_policy( let update = config_format .parse(&config_bytes) .map_err(IndexServiceError::InvalidConfig)?; - update_index(index_id, IndexUpdate::RetentionPolicy(update), metastore).await + update_index( + index_id, + IndexUpdate::RetentionPolicy(Some(update)), + metastore, + ) + .await } pub fn delete_retention_policy_handler( @@ -684,12 +692,13 @@ pub fn delete_retention_policy_handler( tag = "Indexes", path = "/indexes/{index_id}/retention-policy", responses( - (status = 200, description = "Successfully updated the retention policy.") + (status = 200, description = "Successfully deleted the retention policy.") ), params( ("index_id" = String, Path, description = "The index ID to update."), ) )] +/// The deletion is automatically picked up by the janitor service on its next state refresh. pub async fn delete_index_retention_policy( index_id: String, metastore: MetastoreServiceClient, @@ -1938,8 +1947,8 @@ mod tests { let resp = warp::test::request() .path("/indexes/hdfs-logs/retention-policy") .method("PUT") - .json(&true) - .body(r#"{"period":"90 days"}"#) + .header("content-type", "application/yaml") + .body("period: 90 days") .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); @@ -1957,8 +1966,13 @@ mod tests { let resp = warp::test::request() .path("/indexes/hdfs-logs/indexing-settings") .method("PUT") - .json(&true) - .body(r#"{"merge_policy":{"type":"limit_merge"}}"#) + .header("content-type", "application/toml") + .body( + r#" + [merge_policy] + type = "limit_merge" + "#, + ) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); From fa8b3f55b848021687dfad5d77dea651b26f65a3 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 7 May 2024 12:43:52 +0000 Subject: [PATCH 3/4] Address wording and naming comments --- docs/reference/cli.md | 8 ++--- docs/reference/rest-api.md | 4 +-- quickwit/quickwit-cli/src/index/update.rs | 12 +++---- .../quickwit-config/src/index_config/mod.rs | 4 +-- quickwit/quickwit-config/src/lib.rs | 8 ++--- .../src/metastore/file_backed/mod.rs | 10 +++--- .../quickwit-metastore/src/metastore/mod.rs | 26 ++++++++-------- .../src/metastore/postgres/metastore.rs | 10 +++--- .../quickwit-metastore/src/tests/index.rs | 16 +++++----- .../src/index_api/rest_handler.rs | 31 ++++++++++++++----- 10 files changed, 72 insertions(+), 57 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index aff917ff6ec..1893cd50405 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -212,7 +212,7 @@ quickwit index update search-settings | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json or yaml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | +| `--config-file` | Location of a JSON or YAML file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | #### index update retention-policy Updates or disables the retention policy. @@ -230,8 +230,8 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json or yaml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | -| `--disable` | Disables the retention policy. Old indexed data will not be cleaned up anymore. | +| `--config-file` | Location of a JSON or YAML file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | +| `--disable` | Disables the retention policy and keeps indexed data forever. | #### index update indexing-settings Updates indexing settings. @@ -248,7 +248,7 @@ quickwit index update indexing-settings | Option | Description | |-----------------|-------------| -| `--config-file` | Location of a json or yaml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | +| `--config-file` | Location of a JSON or YAML file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | ### index clear Clears an index: deletes all splits and resets checkpoint. diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 3ad8145aeae..187816aef03 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -318,8 +318,8 @@ PUT api/v1/indexes//retention-policy ``` These endpoints follows PUT semantics (not PATCH), which means that all the fields of the updated configuration are replaced by the values specified in the request. Values that are not specified will be reset to their defaults. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. -- The search settings update is automatically picked up by the janitor service on its next state refresh. -- The retention policy update is automatically picked up when the next query is executed. +- The retention policy update is automatically picked up by the janitor service on its next state refresh. +- The search settings update is automatically picked up by searcher nodes when the next query is executed. - The indexing settings update is not automatically picked up by the indexer nodes, they need to be manually restarted. #### PUT payloads diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index b405009c722..0f1415e5bfa 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -44,7 +44,7 @@ pub fn build_index_update_command() -> Command { .about("Updates search settings.") .long_about("Updates search settings. The update is automatically picked up when the next query is executed.") .args(&[ - arg!(--"config-file" "Location of a json or yaml file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") + arg!(--"config-file" "Location of a JSON or YAML file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") .required(true), ])) .subcommand( @@ -52,9 +52,9 @@ pub fn build_index_update_command() -> Command { .about("Updates or disables the retention policy.") .long_about("Updates or disables the retention policy. The update is automatically picked up by the janitor service on its next state refresh.") .args(&[ - arg!(--"config-file" "Location of a json or yaml file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") + arg!(--"config-file" "Location of a JSON or YAML file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") .required(false), - arg!(--disable "Disables the retention policy. Old indexed data will not be cleaned up anymore.") + arg!(--disable "Disables the retention policy and keeps indexed data forever.") .required(false), ]) ) @@ -63,7 +63,7 @@ pub fn build_index_update_command() -> Command { .about("Updates indexing settings.") .long_about("Updates indexing settings. The update is not automatically picked up by the indexer nodes, they need to be manually restarted.") .args(&[ - arg!(--"config-file" "Location of a json or yaml file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") + arg!(--"config-file" "Location of a JSON or YAML file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") .required(true), ]) @@ -173,14 +173,14 @@ async fn update_from_file( field: UpdateConfigField, ) -> anyhow::Result<()> { let storage_resolver = StorageResolver::unconfigured(); - let content = load_file(&storage_resolver, config_file).await?; + let config_content = load_file(&storage_resolver, config_file).await?; client_args .client() .indexes() .update( index_id, field, - Bytes::from(content.as_slice().to_owned()), + Bytes::from(config_content.as_slice().to_owned()), ConfigFormat::sniff_from_uri(config_file)?, ) .await?; diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index c23832c3fab..bd20a1a23ba 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -519,9 +519,9 @@ impl TestableForRegression for IndexConfig { } } -/// Represents an update to one of the updatable index configuration field. +/// Represents an update to one of the updatable index configuration attribute. #[derive(Clone, Debug)] -pub enum IndexUpdate { +pub enum IndexConfigUpdate { SearchSettings(SearchSettings), IndexingSettings(IndexingSettings), RetentionPolicy(Option), diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index fab8309c41e..d42d4b39f66 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -46,8 +46,8 @@ pub use cluster_config::ClusterConfig; // See #2048 use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ - build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, IndexUpdate, - IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, + build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, + IndexConfigUpdate, IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, }; use serde::de::DeserializeOwned; use serde::Serialize; @@ -211,7 +211,7 @@ impl ConfigFormat { serde_json::from_reader(StripComments::new(payload))?; if let Some(version_value) = json_value.get_mut("version") { if let Some(version_number) = version_value.as_u64() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); + warn!(version_value=?version_value, "`version` should be a string, not an integer"); *version_value = JsonValue::String(version_number.to_string()); } } @@ -224,7 +224,7 @@ impl ConfigFormat { toml::from_str(payload_str).context("failed to parse TOML file")?; if let Some(version_value) = toml_value.get_mut("version") { if let Some(version_number) = version_value.as_integer() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); + warn!(version_value=?version_value, "`version` should be a string, not an integer"); *version_value = toml::Value::String(version_number.to_string()); } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index b00c20ecef8..23e7ff7e4e4 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -41,7 +41,7 @@ use async_trait::async_trait; use futures::future::try_join_all; use itertools::Itertools; use quickwit_common::ServiceStream; -use quickwit_config::{IndexTemplate, IndexUpdate}; +use quickwit_config::{IndexConfigUpdate, IndexTemplate}; use quickwit_proto::metastore::{ serde_utils, AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, @@ -461,15 +461,15 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let update = request.deserialize_update()?; + let update = request.deserialize_index_config_update()?; let index_uid = request.index_uid(); let metadata = self .mutate(index_uid, |index| { let mutation_occured = match update { - IndexUpdate::SearchSettings(s) => index.set_search_settings(s), - IndexUpdate::RetentionPolicy(s) => index.set_retention_policy(s), - IndexUpdate::IndexingSettings(s) => index.set_indexing_settings(s), + IndexConfigUpdate::SearchSettings(s) => index.set_search_settings(s), + IndexConfigUpdate::RetentionPolicy(s) => index.set_retention_policy(s), + IndexConfigUpdate::IndexingSettings(s) => index.set_indexing_settings(s), }; if mutation_occured { Ok(MutationOccurred::Yes(index.metadata().clone())) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 8bf69b8460a..043413cbf0a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,7 +31,7 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use quickwit_config::{IndexConfig, IndexUpdate, SourceConfig}; +use quickwit_config::{IndexConfig, IndexConfigUpdate, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, @@ -183,34 +183,34 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { /// Creates a new [`UpdateIndexRequest`] from an [`IndexUpdate`]. - fn try_from_update( + fn try_from_index_config_update( index_uid: impl Into, - update: &IndexUpdate, + update: &IndexConfigUpdate, ) -> MetastoreResult; /// Deserializes the `config_json` field of an [`UpdateIndexRequest`] into /// the appropriate variant of `IndexUpdate`. - fn deserialize_update(&self) -> MetastoreResult; + fn deserialize_index_config_update(&self) -> MetastoreResult; } impl UpdateIndexRequestExt for UpdateIndexRequest { - fn try_from_update( + fn try_from_index_config_update( index_uid: impl Into, - update: &IndexUpdate, + update: &IndexConfigUpdate, ) -> MetastoreResult { let index_uid = Some(index_uid.into()); let update_request = match update { - IndexUpdate::IndexingSettings(s) => UpdateIndexRequest { + IndexConfigUpdate::IndexingSettings(s) => UpdateIndexRequest { index_uid, target_config: UpdatedIndexConfig::IndexingSettings.into(), config_json: Some(serde_utils::to_json_str(s)?), }, - IndexUpdate::SearchSettings(s) => UpdateIndexRequest { + IndexConfigUpdate::SearchSettings(s) => UpdateIndexRequest { index_uid, target_config: UpdatedIndexConfig::SearchSettings.into(), config_json: Some(serde_utils::to_json_str(s)?), }, - IndexUpdate::RetentionPolicy(s) => UpdateIndexRequest { + IndexConfigUpdate::RetentionPolicy(s) => UpdateIndexRequest { index_uid, target_config: UpdatedIndexConfig::RetentionPolicy.into(), config_json: s.as_ref().map(serde_utils::to_json_str).transpose()?, @@ -220,7 +220,7 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { Ok(update_request) } - fn deserialize_update(&self) -> MetastoreResult { + fn deserialize_index_config_update(&self) -> MetastoreResult { let config_ref_opt = self.config_json.as_ref(); match self.target_config { x if x == UpdatedIndexConfig::IndexingSettings as i32 => { @@ -231,7 +231,7 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { struct_name: "IndexingSettings".to_owned(), message: "Should not be empty".to_owned(), })?; - Ok(IndexUpdate::IndexingSettings(indexing_settings)) + Ok(IndexConfigUpdate::IndexingSettings(indexing_settings)) } x if x == UpdatedIndexConfig::SearchSettings as i32 => { let search_settings = config_ref_opt @@ -241,13 +241,13 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { struct_name: "IndexingSettings".to_owned(), message: "Should not be empty".to_owned(), })?; - Ok(IndexUpdate::SearchSettings(search_settings)) + Ok(IndexConfigUpdate::SearchSettings(search_settings)) } x if x == UpdatedIndexConfig::RetentionPolicy as i32 => { let retention_policy = config_ref_opt .map(|json| serde_utils::from_json_str(json)) .transpose()?; - Ok(IndexUpdate::RetentionPolicy(retention_policy)) + Ok(IndexConfigUpdate::RetentionPolicy(retention_policy)) } _ => Err(MetastoreError::JsonDeserializeError { struct_name: "IndexUpdate".to_owned(), diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index fdb4d1de503..ed3d9fd2d32 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -25,7 +25,7 @@ use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; use quickwit_config::{ - validate_index_id_pattern, IndexTemplate, IndexTemplateId, IndexUpdate, + validate_index_id_pattern, IndexConfigUpdate, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, INGEST_V2_SOURCE_ID, }; use quickwit_proto::ingest::{Shard, ShardState}; @@ -406,24 +406,24 @@ impl MetastoreService for PostgresqlMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let update = request.deserialize_update()?; + let update = request.deserialize_index_config_update()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_metadata = run_with_tx!(self.connection_pool, tx, { mutate_index_metadata::(tx, index_uid, |index_metadata| match update { - IndexUpdate::SearchSettings(s) + IndexConfigUpdate::SearchSettings(s) if index_metadata.index_config.search_settings != s => { index_metadata.index_config.search_settings = s; Ok(MutationOccurred::Yes(())) } - IndexUpdate::IndexingSettings(s) + IndexConfigUpdate::IndexingSettings(s) if index_metadata.index_config.indexing_settings != s => { index_metadata.index_config.indexing_settings = s; Ok(MutationOccurred::Yes(())) } - IndexUpdate::RetentionPolicy(s) + IndexConfigUpdate::RetentionPolicy(s) if index_metadata.index_config.retention_policy_opt != s => { index_metadata.index_config.retention_policy_opt = s; diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 7c138016121..1245e9b896c 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -28,8 +28,8 @@ use quickwit_common::rand::append_random_suffix; use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; use quickwit_config::{ - IndexConfig, IndexUpdate, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, - CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, + IndexConfig, IndexConfigUpdate, IndexingSettings, RetentionPolicy, SearchSettings, + SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, }; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, @@ -119,9 +119,9 @@ pub async fn test_metastore_update_retention_policy< new_retention_policy_opt.clone(), None, ] { - let index_update = UpdateIndexRequest::try_from_update( + let index_update = UpdateIndexRequest::try_from_index_config_update( index_uid.clone(), - &IndexUpdate::RetentionPolicy(loop_retention_policy_opt.clone()), + &IndexConfigUpdate::RetentionPolicy(loop_retention_policy_opt.clone()), ) .unwrap(); let response_metadata = metastore @@ -160,9 +160,9 @@ pub async fn test_metastore_update_search_settings< vec!["body".to_owned(), "owner".to_owned()], vec![], ] { - let index_update = UpdateIndexRequest::try_from_update( + let index_update = UpdateIndexRequest::try_from_index_config_update( index_uid.clone(), - &IndexUpdate::SearchSettings(SearchSettings { + &IndexConfigUpdate::SearchSettings(SearchSettings { default_search_fields: loop_search_settings.clone(), }), ) @@ -212,9 +212,9 @@ pub async fn test_metastore_update_indexing_settings< ..Default::default() }), ] { - let index_update = UpdateIndexRequest::try_from_update( + let index_update = UpdateIndexRequest::try_from_index_config_update( index_uid.clone(), - &IndexUpdate::IndexingSettings(IndexingSettings { + &IndexConfigUpdate::IndexingSettings(IndexingSettings { merge_policy: loop_indexing_settings.clone(), ..Default::default() }), diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 135a5ef00ec..82ee98828e6 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ - load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, IndexUpdate, - NodeConfig, SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, + load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, + IndexConfigUpdate, NodeConfig, SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; @@ -549,7 +549,7 @@ pub fn pre_update_filter( async fn update_index( index_id: String, - request: IndexUpdate, + request: IndexConfigUpdate, mut metastore: MetastoreServiceClient, ) -> Result { info!(index_id = %index_id, "update-index"); @@ -559,7 +559,7 @@ async fn update_index( .await? .deserialize_index_metadata()? .index_uid; - let update_request = UpdateIndexRequest::try_from_update(index_uid, &request)?; + let update_request = UpdateIndexRequest::try_from_index_config_update(index_uid, &request)?; let update_resp = metastore.update_index(update_request).await?; Ok(update_resp.deserialize_index_metadata()?) } @@ -597,7 +597,12 @@ pub async fn update_index_indexing_settings( let update = config_format .parse(&config_bytes) .map_err(IndexServiceError::InvalidConfig)?; - update_index(index_id, IndexUpdate::IndexingSettings(update), metastore).await + update_index( + index_id, + IndexConfigUpdate::IndexingSettings(update), + metastore, + ) + .await } pub fn update_search_settings_handler( @@ -632,7 +637,12 @@ pub async fn update_index_search_settings( let update = config_format .parse(&config_bytes) .map_err(IndexServiceError::InvalidConfig)?; - update_index(index_id, IndexUpdate::SearchSettings(update), metastore).await + update_index( + index_id, + IndexConfigUpdate::SearchSettings(update), + metastore, + ) + .await } pub fn update_retention_policy_handler( @@ -669,7 +679,7 @@ pub async fn update_index_retention_policy( .map_err(IndexServiceError::InvalidConfig)?; update_index( index_id, - IndexUpdate::RetentionPolicy(Some(update)), + IndexConfigUpdate::RetentionPolicy(Some(update)), metastore, ) .await @@ -703,7 +713,12 @@ pub async fn delete_index_retention_policy( index_id: String, metastore: MetastoreServiceClient, ) -> Result { - update_index(index_id, IndexUpdate::RetentionPolicy(None), metastore).await + update_index( + index_id, + IndexConfigUpdate::RetentionPolicy(None), + metastore, + ) + .await } fn clear_index_handler( From c98ed8e8fcbf028f50cbfd837527620883e8eedc Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 7 May 2024 13:58:41 +0000 Subject: [PATCH 4/4] Use json enum instead of hand written proto enum --- .../quickwit-config/src/index_config/mod.rs | 3 +- .../quickwit-metastore/src/metastore/mod.rs | 62 +++---------------- .../protos/quickwit/metastore.proto | 4 +- .../codegen/quickwit/quickwit.metastore.rs | 7 +-- 4 files changed, 14 insertions(+), 62 deletions(-) diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index bd20a1a23ba..af6a506b19f 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -520,7 +520,8 @@ impl TestableForRegression for IndexConfig { } /// Represents an update to one of the updatable index configuration attribute. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "field", content = "config")] pub enum IndexConfigUpdate { SearchSettings(SearchSettings), IndexingSettings(IndexingSettings), diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 043413cbf0a..04f262e33f7 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -38,7 +38,6 @@ use quickwit_proto::metastore::{ IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, - UpdatedIndexConfig, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -182,14 +181,14 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { - /// Creates a new [`UpdateIndexRequest`] from an [`IndexUpdate`]. + /// Creates a new [`UpdateIndexRequest`] from an [`IndexConfigUpdate`]. fn try_from_index_config_update( index_uid: impl Into, update: &IndexConfigUpdate, ) -> MetastoreResult; /// Deserializes the `config_json` field of an [`UpdateIndexRequest`] into - /// the appropriate variant of `IndexUpdate`. + /// the appropriate variant of `IndexConfigUpdate`. fn deserialize_index_config_update(&self) -> MetastoreResult; } @@ -199,61 +198,14 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { update: &IndexConfigUpdate, ) -> MetastoreResult { let index_uid = Some(index_uid.into()); - let update_request = match update { - IndexConfigUpdate::IndexingSettings(s) => UpdateIndexRequest { - index_uid, - target_config: UpdatedIndexConfig::IndexingSettings.into(), - config_json: Some(serde_utils::to_json_str(s)?), - }, - IndexConfigUpdate::SearchSettings(s) => UpdateIndexRequest { - index_uid, - target_config: UpdatedIndexConfig::SearchSettings.into(), - config_json: Some(serde_utils::to_json_str(s)?), - }, - IndexConfigUpdate::RetentionPolicy(s) => UpdateIndexRequest { - index_uid, - target_config: UpdatedIndexConfig::RetentionPolicy.into(), - config_json: s.as_ref().map(serde_utils::to_json_str).transpose()?, - }, - }; - - Ok(update_request) + Ok(UpdateIndexRequest { + index_uid, + updated_config_json: serde_utils::to_json_str(update)?, + }) } fn deserialize_index_config_update(&self) -> MetastoreResult { - let config_ref_opt = self.config_json.as_ref(); - match self.target_config { - x if x == UpdatedIndexConfig::IndexingSettings as i32 => { - let indexing_settings = config_ref_opt - .map(|json| serde_utils::from_json_str(json)) - .transpose()? - .ok_or(MetastoreError::JsonDeserializeError { - struct_name: "IndexingSettings".to_owned(), - message: "Should not be empty".to_owned(), - })?; - Ok(IndexConfigUpdate::IndexingSettings(indexing_settings)) - } - x if x == UpdatedIndexConfig::SearchSettings as i32 => { - let search_settings = config_ref_opt - .map(|json| serde_utils::from_json_str(json)) - .transpose()? - .ok_or(MetastoreError::JsonDeserializeError { - struct_name: "IndexingSettings".to_owned(), - message: "Should not be empty".to_owned(), - })?; - Ok(IndexConfigUpdate::SearchSettings(search_settings)) - } - x if x == UpdatedIndexConfig::RetentionPolicy as i32 => { - let retention_policy = config_ref_opt - .map(|json| serde_utils::from_json_str(json)) - .transpose()?; - Ok(IndexConfigUpdate::RetentionPolicy(retention_policy)) - } - _ => Err(MetastoreError::JsonDeserializeError { - struct_name: "IndexUpdate".to_owned(), - message: "Unexpected target config".to_owned(), - }), - } + serde_utils::from_json_str(&self.updated_config_json) } } diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 80d59d3ee64..2b156317718 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -210,8 +210,8 @@ enum UpdatedIndexConfig { message UpdateIndexRequest { quickwit.common.IndexUid index_uid = 1; - UpdatedIndexConfig target_config = 2; - optional string config_json = 3; + // JSON representation of the updated config field. + string updated_config_json = 2; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 47ec880c7cc..7a72d62c93a 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -26,10 +26,9 @@ pub struct CreateIndexResponse { pub struct UpdateIndexRequest { #[prost(message, optional, tag = "1")] pub index_uid: ::core::option::Option, - #[prost(enumeration = "UpdatedIndexConfig", tag = "2")] - pub target_config: i32, - #[prost(string, optional, tag = "3")] - pub config_json: ::core::option::Option<::prost::alloc::string::String>, + /// JSON representation of the updated config field. + #[prost(string, tag = "2")] + pub updated_config_json: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)]