diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 651e162013..1893cd5040 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -183,37 +183,46 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ### index update `quickwit index update [args]` + +*Synopsis* + +```bash +quickwit index update + --index +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--index` | ID of the target index | #### index update search-settings -Updates default search settings. +Updates search settings. `quickwit index update search-settings [args]` *Synopsis* ```bash quickwit index update search-settings - --index - --default-search-fields + --config-file ``` *Options* | Option | Description | |-----------------|-------------| -| `--index` | ID of the target index | -| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | +| `--config-file` | Location of a JSON or YAML file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings. | #### index update retention-policy -Configure or disable the retention policy. +Updates or disables the retention policy. `quickwit index update retention-policy [args]` *Synopsis* ```bash quickwit index update retention-policy - --index - [--period ] - [--schedule ] + [--config-file ] [--disable] ``` @@ -221,10 +230,25 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| -| `--index` | ID of the target index | -| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | -| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | -| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | +| `--config-file` | Location of a JSON or YAML file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy. | +| `--disable` | Disables the retention policy and keeps indexed data forever. | +#### index update indexing-settings + +Updates indexing settings. +`quickwit index update indexing-settings [args]` + +*Synopsis* + +```bash +quickwit index update indexing-settings + --config-file +``` + +*Options* + +| Option | Description | +|-----------------|-------------| +| `--config-file` | Location of a JSON or YAML file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -368,6 +392,7 @@ quickwit index ingest [--input-path ] [--batch-size-limit ] [--wait] + [--v2] [--force] [--commit-timeout ] ``` @@ -380,8 +405,9 @@ quickwit index ingest | `--input-path` | Location of the input file. | | `--batch-size-limit` | Size limit of each submitted document batch. | | `--wait` | Wait for all documents to be commited and available for search before exiting | +| `--v2` | Ingest v2 (experimental! Do not use me.) | | `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting | -| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation. | +| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation. | *Examples* diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 0408851d73..187816aef0 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -309,45 +309,58 @@ The response is the index metadata of the created index, and the content type is | `sources` | List of the index sources configurations. | `Array` | -### Update an index (search settings and retention policy only) +### Update an index ``` -PUT api/v1/indexes/ +PUT api/v1/indexes//indexing-settings +PUT api/v1/indexes//search-settings +PUT api/v1/indexes//retention-policy ``` -Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. +These endpoints follows PUT semantics (not PATCH), which means that all the fields of the updated configuration are replaced by the values specified in the request. Values that are not specified will be reset to their defaults. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. +- The retention policy update is automatically picked up by the janitor service on its next state refresh. +- The search settings update is automatically picked up by searcher nodes when the next query is executed. +- The indexing settings update is not automatically picked up by the indexer nodes, they need to be manually restarted. -#### PUT payload +#### PUT payloads -| Variable | Type | Description | Default value | -|---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| -| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | -| `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | +| Endpoint | Type | Description | +|---------------------|--------------------|------------------------------------------------------------------------------------------| +| `/search-settings` | `SearchSettings` | See [search settings config docs](../configuration/index-config.md#search-settings). | +| `/retention-policy` | `Retention` | See [retention policy config docs](../configuration/index-config.md#retention-policy). | +| `/indexing-settings`| `IndexingSettings` | See [indexing settings config docs](../configuration/index-config.md#indexing-settings). | -**Payload Example** +**Payload Examples** -curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Content-Type: application/json" +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/search-settings --data @search_settings_update.json -H "Content-Type: application/json" -```json title="index_update.json +```json title="search_settings_update.json { - "search_settings": { - "default_search_fields": ["body"] - }, - "retention": { - "period": "3 days", - "schedule": "@daily" - } + "default_search_fields": ["body"] } ``` -:::warning -Calling the update endpoint with the following payload will remove the current retention policy. -```json +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/retention-policy --data @retention_policy_update.json -H "Content-Type: application/json" + +```json title="retention_policy_update.json { - "search_settings": { - "default_search_fields": ["body"] - } + "period": "3 days", + "schedule": "daily" +} +``` + +curl -XPUT http://0.0.0.0:8080/api/v1/indexes/my-index/indexing-settings --data @indexing_settings_update.json -H "Content-Type: application/json" + +```json title="indexing_settings_update.json +{ + "merge_policy": { + "type": "limit_merge", + "max_merge_ops": 3, + "merge_factor": 10, + "max_merge_factor": 12 + }, + "commit_timeout_secs": 5 } ``` diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs index c79f30357e..0f1415e5bf 100644 --- a/quickwit/quickwit-cli/src/index/update.rs +++ b/quickwit/quickwit-cli/src/index/update.rs @@ -17,71 +17,85 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::str::FromStr; + use anyhow::{bail, Context}; use clap::{arg, ArgMatches, Command}; use colored::Colorize; -use quickwit_config::{RetentionPolicy, SearchSettings}; -use quickwit_serve::IndexUpdates; +use quickwit_common::uri::Uri; +use quickwit_config::ConfigFormat; +use quickwit_proto::bytes::Bytes; +use quickwit_rest_client::rest_client::UpdateConfigField; +use quickwit_storage::{load_file, StorageResolver}; use tracing::debug; -use crate::checklist::GREEN_COLOR; +use crate::checklist::{BLUE_COLOR, GREEN_COLOR}; use crate::ClientArgs; pub fn build_index_update_command() -> Command { Command::new("update") + .args(&[ + arg!(--index "ID of the target index") + .required(true), + ]) .subcommand_required(true) .subcommand( Command::new("search-settings") - .about("Updates default search settings.") + .about("Updates search settings.") + .long_about("Updates search settings. The update is automatically picked up when the next query is executed.") .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") - .display_order(2) - .num_args(0..) + arg!(--"config-file" "Location of a JSON or YAML file containing the new search settings. See https://quickwit.io/docs/configuration/index-config#search-settings.") .required(true), ])) .subcommand( Command::new("retention-policy") - .about("Configures or disables the retention policy.") + .about("Updates or disables the retention policy.") + .long_about("Updates or disables the retention policy. The update is automatically picked up by the janitor service on its next state refresh.") .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") - .display_order(2) + arg!(--"config-file" "Location of a JSON or YAML file containing the new retention policy. See https://quickwit.io/docs/configuration/index-config#retention-policy.") .required(false), - arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") - .display_order(3) - .required(false), - arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") - .display_order(4) + arg!(--disable "Disables the retention policy and keeps indexed data forever.") .required(false), ]) ) + .subcommand( + Command::new("indexing-settings") + .about("Updates indexing settings.") + .long_about("Updates indexing settings. The update is not automatically picked up by the indexer nodes, they need to be manually restarted.") + .args(&[ + arg!(--"config-file" "Location of a JSON or YAML file containing the new indexing settings. See https://quickwit.io/docs/configuration/index-config#indexing-settings.") + .required(true), + ]) + + ) } #[derive(Debug, Eq, PartialEq)] -pub struct RetentionPolicyArgs { +pub struct OptionalFieldArgs { pub client_args: ClientArgs, pub index_id: String, - pub disable: bool, - pub period: Option, - pub schedule: Option, + pub config_file_opt: Option, } #[derive(Debug, Eq, PartialEq)] -pub struct SearchSettingsArgs { +pub struct RequiredFieldArgs { pub client_args: ClientArgs, pub index_id: String, - pub default_search_fields: Vec, + pub config_file: Uri, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct IndexingSettingsArgs { + pub client_args: ClientArgs, + pub index_id: String, + pub config_file: Uri, } #[derive(Debug, Eq, PartialEq)] pub enum IndexUpdateCliCommand { - RetentionPolicy(RetentionPolicyArgs), - SearchSettings(SearchSettingsArgs), + RetentionPolicy(OptionalFieldArgs), + SearchSettings(RequiredFieldArgs), + IndexingSettings(RequiredFieldArgs), } impl IndexUpdateCliCommand { @@ -89,45 +103,57 @@ impl IndexUpdateCliCommand { let (subcommand, submatches) = matches .remove_subcommand() .context("failed to parse index update subcommand")?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); match subcommand.as_str() { - "retention-policy" => Self::parse_update_retention_policy_args(submatches), - "search-settings" => Self::parse_update_search_settings_args(submatches), + "retention-policy" => Self::parse_update_optional_field_args(submatches, index_id), + "search-settings" => Self::parse_update_required_field_args(submatches, index_id), + "indexing-settings" => Self::parse_update_required_field_args(submatches, index_id), _ => bail!("unknown index update subcommand `{subcommand}`"), } } - fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { + /// Parse args for optional fields (i.e retention policy). + fn parse_update_optional_field_args( + mut matches: ArgMatches, + index_id: String, + ) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); + + let config_file_opt = matches + .remove_one::("config-file") + .map(|uri| Uri::from_str(&uri)) + .transpose()?; let disable = matches.get_flag("disable"); - let period = matches.remove_one::("period"); - let schedule = matches.remove_one::("schedule"); - Ok(Self::RetentionPolicy(RetentionPolicyArgs { + if !disable && config_file_opt.is_none() { + bail!("either `--config-file` or `--disable` must be specified"); + } + if disable && config_file_opt.is_some() { + bail!("both `--config-file` and `--disable` cannot be specified"); + } + Ok(Self::RetentionPolicy(OptionalFieldArgs { client_args, index_id, - disable, - period, - schedule, + config_file_opt, })) } - fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { + /// Parse args for required fields (i.e. search and indexing settings). + fn parse_update_required_field_args( + mut matches: ArgMatches, + index_id: String, + ) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let default_search_fields = matches - .remove_many::("default-search-fields") - .map(|values| values.collect()) - // --default-search-fields should be made optional if other fields - // are added to SearchSettings - .expect("`default-search-fields` should be a required arg."); - Ok(Self::SearchSettings(SearchSettingsArgs { + + let config_file = matches + .remove_one::("config-file") + .map(|uri| Uri::from_str(&uri)) + .expect("`config-file` should be a required arg.")?; + Ok(Self::SearchSettings(RequiredFieldArgs { client_args, index_id, - default_search_fields, + config_file, })) } @@ -135,87 +161,92 @@ impl IndexUpdateCliCommand { match self { Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, Self::SearchSettings(args) => update_search_settings_cli(args).await, + Self::IndexingSettings(args) => update_indexing_settings_cli(args).await, } } } -pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-retention-policy"); - println!("❯ Updating index retention policy..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let new_retention_policy_opt = match ( - args.disable, - args.period, - args.schedule, - metadata.index_config.retention_policy_opt, - ) { - (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { - bail!("`--period` and `--schedule` cannot be used together with `--disable`") - } - (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), - (false, None, Some(_), None) => { - bail!("`--period` is required when creating a retention policy") - } - (true, None, None, _) => None, - (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { - retention_period: policy.retention_period, - evaluation_schedule: schedule, - }), - (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), - }), - (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), - }), - }; - if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { - println!( - "New retention policy: {}", - serde_json::to_string(&new_retention_policy)? - ); - } else { - println!("Disable retention policy."); - } - qw_client +async fn update_from_file( + client_args: ClientArgs, + index_id: &str, + config_file: &Uri, + field: UpdateConfigField, +) -> anyhow::Result<()> { + let storage_resolver = StorageResolver::unconfigured(); + let config_content = load_file(&storage_resolver, config_file).await?; + client_args + .client() .indexes() .update( + index_id, + field, + Bytes::from(config_content.as_slice().to_owned()), + ConfigFormat::sniff_from_uri(config_file)?, + ) + .await?; + Ok(()) +} + +pub async fn update_retention_policy_cli(args: OptionalFieldArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-retention-policy"); + println!("❯ Updating index retention policy..."); + if let Some(uri) = args.config_file_opt { + update_from_file( + args.client_args, &args.index_id, - IndexUpdates { - retention_policy_opt: new_retention_policy_opt, - search_settings: metadata.index_config.search_settings, - }, + &uri, + UpdateConfigField::RetentionPolicy, ) .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + } else { + args.client_args + .client() + .indexes() + .delete_retention_policy(&args.index_id) + .await?; + } + println!( + "{} Index retention policy successfully updated.", + "✔".color(GREEN_COLOR) + ); Ok(()) } -pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { +pub async fn update_search_settings_cli(args: RequiredFieldArgs) -> anyhow::Result<()> { debug!(args=?args, "update-index-search-settings"); println!("❯ Updating index search settings..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let search_settings = SearchSettings { - default_search_fields: args.default_search_fields, - }; + update_from_file( + args.client_args, + &args.index_id, + &args.config_file, + UpdateConfigField::SearchSettings, + ) + .await?; println!( - "New search settings: {}", - serde_json::to_string(&search_settings)? + "{} Index search settings successfully updated.", + "✔".color(GREEN_COLOR) + ); + Ok(()) +} + +pub async fn update_indexing_settings_cli(args: RequiredFieldArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index-indexing-settings"); + println!("❯ Updating index indexing settings..."); + update_from_file( + args.client_args, + &args.index_id, + &args.config_file, + UpdateConfigField::IndexingSettings, + ) + .await?; + println!( + "{} Index indexing settings successfully updated.", + "✔".color(GREEN_COLOR) + ); + println!( + "{} Restart indexer nodes for the new configuration to take effect.", + "!".color(BLUE_COLOR) ); - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: metadata.index_config.retention_policy_opt, - search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); Ok(()) } @@ -232,25 +263,42 @@ mod test { .try_get_matches_from([ "index", "update", - "retention-policy", "--index", "my-index", - "--period", - "1 day", + "retention-policy", + "--config-file", + "/tmp/hello.json", ]) .unwrap(); let command = CliCommand::parse_cli_args(matches).unwrap(); assert!(matches!( command, CliCommand::Index(IndexCliCommand::Update( - IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { + IndexUpdateCliCommand::RetentionPolicy(OptionalFieldArgs { client_args: _, index_id, - disable: false, - period: Some(period), - schedule: None, + config_file_opt: Some(uri), }) - )) if &index_id == "my-index" && &period == "1 day" + )) if &index_id == "my-index" && uri.as_str() == "file:///tmp/hello.json" )); } + + #[test] + fn test_cmd_invalid_update_subsubcommand() { + let app = build_cli().no_binary_name(true); + let matches = app + .try_get_matches_from([ + "index", + "update", + "--index", + "my-index", + "retention-policy", + "--config-file", + "/tmp/hello.json", + "--disable", + ]) + .unwrap(); + CliCommand::parse_cli_args(matches) + .expect_err("command with both `--config-file` and `--disable` should fail"); + } } diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 848934c6a2..03aa9f92bd 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -29,7 +29,7 @@ use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; -use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; +use quickwit_cli::index::update::{update_retention_policy_cli, OptionalFieldArgs}; use quickwit_cli::index::{ create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, SearchIndexArgs, @@ -543,17 +543,21 @@ async fn test_cmd_update_index() { .unwrap(); test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); + let test_update_uri = Uri::from_str( + &test_env.resource_files["retention_policy_update"] + .display() + .to_string(), + ) + .unwrap(); - // add a policy - update_retention_policy_cli(RetentionPolicyArgs { + // add a retention policy + update_retention_policy_cli(OptionalFieldArgs { index_id: index_id.clone(), client_args: ClientArgs { cluster_endpoint: test_env.cluster_endpoint.clone(), ..Default::default() }, - disable: false, - period: Some(String::from("1 week")), - schedule: Some(String::from("daily")), + config_file_opt: Some(test_update_uri), }) .await .unwrap(); @@ -566,30 +570,14 @@ async fn test_cmd_update_index() { }) ); - // invalid args - update_retention_policy_cli(RetentionPolicyArgs { - index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: Some(String::from("a week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap_err(); - // remove the policy - update_retention_policy_cli(RetentionPolicyArgs { + update_retention_policy_cli(OptionalFieldArgs { index_id, client_args: ClientArgs { cluster_endpoint: test_env.cluster_endpoint.clone(), ..Default::default() }, - disable: true, - period: None, - schedule: None, + config_file_opt: None, }) .await .unwrap(); diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index d52e48034d..28fabf8957 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -231,6 +231,11 @@ pub async fn create_test_env( fs::write(&log_docs_path, LOGS_JSON_DOCS)?; let wikipedia_docs_path = resources_dir_path.join("wikis.json"); fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?; + let retention_policy_update_path = resources_dir_path.join("retention_policy_update.json"); + fs::write( + &retention_policy_update_path, + r#"{"period": "1 week", "schedule": "daily"}"#, + )?; let mut resource_files = HashMap::new(); resource_files.insert("config", node_config_path); @@ -238,6 +243,7 @@ pub async fn create_test_env( resource_files.insert("index_config_without_uri", index_config_without_uri_path); resource_files.insert("logs", log_docs_path); resource_files.insert("wiki", wikipedia_docs_path); + resource_files.insert("retention_policy_update", retention_policy_update_path); let config_uri = Uri::from_str(&format!("file://{}", resource_files["config"].display())).unwrap(); diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 2aa295e49b..af6a506b19 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -519,6 +519,15 @@ impl TestableForRegression for IndexConfig { } } +/// Represents an update to one of the updatable index configuration attribute. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "field", content = "config")] +pub enum IndexConfigUpdate { + SearchSettings(SearchSettings), + IndexingSettings(IndexingSettings), + RetentionPolicy(Option), +} + /// Builds and returns the doc mapper associated with index. pub fn build_doc_mapper( doc_mapping: &DocMapping, diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 3b45881992..d42d4b39f6 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -47,7 +47,7 @@ pub use cluster_config::ClusterConfig; use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ build_doc_mapper, load_index_config_from_user_config, DocMapping, IndexConfig, - IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, + IndexConfigUpdate, IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, }; use serde::de::DeserializeOwned; use serde::Serialize; @@ -209,10 +209,11 @@ impl ConfigFormat { ConfigFormat::Json => { let mut json_value: JsonValue = serde_json::from_reader(StripComments::new(payload))?; - let version_value = json_value.get_mut("version").context("missing version")?; - if let Some(version_number) = version_value.as_u64() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); - *version_value = JsonValue::String(version_number.to_string()); + if let Some(version_value) = json_value.get_mut("version") { + if let Some(version_number) = version_value.as_u64() { + warn!(version_value=?version_value, "`version` should be a string, not an integer"); + *version_value = JsonValue::String(version_number.to_string()); + } } serde_json::from_value(json_value).context("failed to parse JSON file") } @@ -221,16 +222,13 @@ impl ConfigFormat { .context("configuration file contains invalid UTF-8 characters")?; let mut toml_value: toml::Value = toml::from_str(payload_str).context("failed to parse TOML file")?; - let version_value = toml_value.get_mut("version").context("missing version")?; - if let Some(version_number) = version_value.as_integer() { - warn!(version_value=?version_value, "`version` is supposed to be a string"); - *version_value = toml::Value::String(version_number.to_string()); - let reserialized = toml::to_string(version_value) - .context("failed to reserialize toml config")?; - toml::from_str(&reserialized).context("failed to parse TOML file") - } else { - toml::from_str(payload_str).context("failed to parse TOML file") + if let Some(version_value) = toml_value.get_mut("version") { + if let Some(version_number) = version_value.as_integer() { + warn!(version_value=?version_value, "`version` should be a string, not an integer"); + *version_value = toml::Value::String(version_number.to_string()); + } } + toml_value.try_into().context("failed to parse TOML file") } ConfigFormat::Yaml => { serde_yaml::from_slice(payload).context("failed to parse YAML file") diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index af64cc9c47..6528dde3d7 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -20,7 +20,8 @@ mod shard_table; use std::borrow::Cow; -use std::collections::BTreeSet; +use std::collections::hash_map::Entry; +use std::collections::{BTreeSet, HashMap}; use std::mem; use std::ops::Deref; use std::time::Instant; @@ -54,7 +55,7 @@ use tracing::{info, instrument, warn}; #[derive(Default, Debug)] pub(crate) struct ControlPlaneModel { index_uid_table: FnvHashMap, - index_table: FnvHashMap, + index_source_table: FnvHashMap>, shard_table: ShardTable, } @@ -65,7 +66,7 @@ impl ControlPlaneModel { } pub fn num_indexes(&self) -> usize { - self.index_table.len() + self.index_source_table.len() } pub fn num_sources(&self) -> usize { @@ -95,7 +96,7 @@ impl ControlPlaneModel { .await?; let num_indexes = indexes_metadata.len(); - self.index_table.reserve(num_indexes); + self.index_source_table.reserve(num_indexes); for index_metadata in indexes_metadata { self.add_index(index_metadata); @@ -105,13 +106,13 @@ impl ControlPlaneModel { let mut next_list_shards_request = metastore::ListShardsRequest::default(); - for (idx, index_metadata) in self.index_table.values().enumerate() { - for source_config in index_metadata.sources.values() { + for (idx, (index_uid, index_metadata)) in self.index_source_table.iter().enumerate() { + for source_config in index_metadata.values() { num_sources += 1; if source_config.source_type() == SourceType::IngestV2 { let request = ListShardsSubrequest { - index_uid: index_metadata.index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_config.source_id.clone(), shard_state: None, }; @@ -155,24 +156,23 @@ impl ControlPlaneModel { fn update_metrics(&self) { crate::metrics::CONTROL_PLANE_METRICS .indexes_total - .set(self.index_table.len() as i64); + .set(self.index_source_table.len() as i64); } pub(crate) fn source_configs(&self) -> impl Iterator + '_ { - self.index_table.values().flat_map(|index_metadata| { - index_metadata - .sources - .iter() - .map(move |(source_id, source_config)| { + self.index_source_table + .iter() + .flat_map(|(index_uid, sources)| { + sources.iter().map(move |(source_id, source_config)| { ( SourceUid { - index_uid: index_metadata.index_uid.clone(), + index_uid: index_uid.clone(), source_id: source_id.clone(), }, source_config, ) }) - }) + }) } pub(crate) fn add_index(&mut self, index_metadata: IndexMetadata) { @@ -186,12 +186,13 @@ impl ControlPlaneModel { self.shard_table.add_source(&index_uid, source_id); } } - self.index_table.insert(index_uid, index_metadata); + self.index_source_table + .insert(index_uid, index_metadata.sources); self.update_metrics(); } pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { - self.index_table.remove(index_uid); + self.index_source_table.remove(index_uid); self.index_uid_table.remove(&index_uid.index_id); self.shard_table.delete_index(&index_uid.index_id); self.update_metrics(); @@ -204,13 +205,21 @@ impl ControlPlaneModel { index_uid: &IndexUid, source_config: SourceConfig, ) -> ControlPlaneResult<()> { - let index_metadata = self.index_table.get_mut(index_uid).ok_or_else(|| { + let index_sources = self.index_source_table.get_mut(index_uid).ok_or_else(|| { MetastoreError::NotFound(EntityKind::Index { index_id: index_uid.to_string(), }) })?; - index_metadata.add_source(source_config.clone())?; - + match index_sources.entry(source_config.source_id.clone()) { + Entry::Occupied(_) => Err(MetastoreError::AlreadyExists(EntityKind::Source { + index_id: index_uid.index_id.clone(), + source_id: source_config.source_id.clone(), + })), + Entry::Vacant(entry) => { + entry.insert(source_config.clone()); + Ok(()) + } + }?; if source_config.source_type() == SourceType::IngestV2 { self.shard_table .add_source(index_uid, &source_config.source_id); @@ -223,15 +232,11 @@ impl ControlPlaneModel { self.shard_table .delete_source(&source_uid.index_uid, &source_uid.source_id); // Remove source from index metadata. - let Some(index_metadata) = self.index_table.get_mut(&source_uid.index_uid) else { + let Some(index_sources) = self.index_source_table.get_mut(&source_uid.index_uid) else { warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: index not found"); return; }; - if index_metadata - .sources - .remove(&source_uid.source_id) - .is_none() - { + if index_sources.remove(&source_uid.source_id).is_none() { warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: source not found"); }; } @@ -244,10 +249,10 @@ impl ControlPlaneModel { source_id: &SourceId, enable: bool, ) -> anyhow::Result { - let Some(index_model) = self.index_table.get_mut(index_uid) else { + let Some(index_sources) = self.index_source_table.get_mut(index_uid) else { bail!("index `{}` not found", index_uid.index_id); }; - let Some(source_config) = index_model.sources.get_mut(source_id) else { + let Some(source_config) = index_sources.get_mut(source_id) else { bail!("source `{source_id}` not found"); }; let has_changed = source_config.enabled != enable; @@ -457,7 +462,7 @@ mod tests { .await .unwrap(); - assert_eq!(model.index_table.len(), 3); + assert_eq!(model.index_source_table.len(), 3); assert_eq!(model.index_uid("test-index-0").unwrap(), index_uid); assert_eq!(model.index_uid("test-index-1").unwrap(), index_uid2); assert_eq!(model.index_uid("test-index-2").unwrap(), index_uid3); @@ -497,8 +502,11 @@ mod tests { let index_uid = index_metadata.index_uid.clone(); model.add_index(index_metadata.clone()); - assert_eq!(model.index_table.len(), 1); - assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); + assert_eq!(model.index_source_table.len(), 1); + assert_eq!( + model.index_source_table.get(&index_uid).unwrap(), + &index_metadata.sources + ); assert_eq!(model.index_uid_table.len(), 1); assert_eq!(model.index_uid("test-index").unwrap(), index_uid); @@ -515,8 +523,11 @@ mod tests { let index_uid = index_metadata.index_uid.clone(); model.add_index(index_metadata.clone()); - assert_eq!(model.index_table.len(), 1); - assert_eq!(model.index_table.get(&index_uid).unwrap(), &index_metadata); + assert_eq!(model.index_source_table.len(), 1); + assert_eq!( + model.index_source_table.get(&index_uid).unwrap(), + &index_metadata.sources + ); assert_eq!(model.index_uid_table.len(), 1); assert_eq!(model.index_uid("test-index").unwrap(), index_uid); @@ -545,7 +556,7 @@ mod tests { model.delete_index(&index_uid); - assert!(model.index_table.is_empty()); + assert!(model.index_source_table.is_empty()); assert!(model.index_uid_table.is_empty()); assert_eq!(model.shard_table.num_sources(), 0); } diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs index 5ddc8a034e..4725a72603 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -20,10 +20,11 @@ use std::collections::HashSet; use std::time::Duration; +use hyper::body::Bytes; use quickwit_config::service::QuickwitService; -use quickwit_config::SearchSettings; -use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use quickwit_config::ConfigFormat; +use quickwit_rest_client::rest_client::{CommitType, UpdateConfigField}; +use quickwit_serve::SearchRequestQueryString; use serde_json::json; use crate::ingest_json; @@ -121,12 +122,9 @@ async fn test_update_on_multi_nodes_cluster() { .indexes() .update( "my-updatable-index", - IndexUpdates { - search_settings: SearchSettings { - default_search_fields: vec!["title".to_string(), "body".to_string()], - }, - retention_policy_opt: None, - }, + UpdateConfigField::SearchSettings, + Bytes::from(r#"{"default_search_fields":["title", "body"]}"#), + ConfigFormat::Json, ) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 5f7f91c81e..4ad47813e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,9 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID, +}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, @@ -227,6 +229,13 @@ impl FileBackedIndex { is_mutation } + /// Replaces the retention policy in the index config, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, indexing_settings: IndexingSettings) -> bool { + let is_mutation = self.metadata.index_config.indexing_settings != indexing_settings; + self.metadata.index_config.indexing_settings = indexing_settings; + is_mutation + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index a0ec0f5126..23e7ff7e4e 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -41,7 +41,7 @@ use async_trait::async_trait; use futures::future::try_join_all; use itertools::Itertools; use quickwit_common::ServiceStream; -use quickwit_config::IndexTemplate; +use quickwit_config::{IndexConfigUpdate, IndexTemplate}; use quickwit_proto::metastore::{ serde_utils, AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, @@ -461,15 +461,17 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let search_settings = request.deserialize_search_settings()?; - let retention_policy_opt = request.deserialize_retention_policy()?; + let update = request.deserialize_index_config_update()?; let index_uid = request.index_uid(); let metadata = self .mutate(index_uid, |index| { - let search_settings_mutated = index.set_search_settings(search_settings); - let retention_policy_mutated = index.set_retention_policy(retention_policy_opt); - if search_settings_mutated || retention_policy_mutated { + let mutation_occured = match update { + IndexConfigUpdate::SearchSettings(s) => index.set_search_settings(s), + IndexConfigUpdate::RetentionPolicy(s) => index.set_retention_policy(s), + IndexConfigUpdate::IndexingSettings(s) => index.set_indexing_settings(s), + }; + if mutation_occured { Ok(MutationOccurred::Yes(index.metadata().clone())) } else { Ok(MutationOccurred::No(index.metadata().clone())) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3a231f0029..04f262e33f 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,7 +31,7 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; +use quickwit_config::{IndexConfig, IndexConfigUpdate, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, @@ -181,51 +181,31 @@ impl CreateIndexResponseExt for CreateIndexResponse { /// Helper trait to build a [`UpdateIndexRequest`] and deserialize its payload. pub trait UpdateIndexRequestExt { - /// Creates a new [`UpdateIndexRequest`] from the different updated fields. - fn try_from_updates( + /// Creates a new [`UpdateIndexRequest`] from an [`IndexConfigUpdate`]. + fn try_from_index_config_update( index_uid: impl Into, - search_settings: &SearchSettings, - retention_policy_opt: &Option, + update: &IndexConfigUpdate, ) -> MetastoreResult; - /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a - /// [`SearchSettings`] object. - fn deserialize_search_settings(&self) -> MetastoreResult; - - /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a - /// [`RetentionPolicy`] object. - fn deserialize_retention_policy(&self) -> MetastoreResult>; + /// Deserializes the `config_json` field of an [`UpdateIndexRequest`] into + /// the appropriate variant of `IndexConfigUpdate`. + fn deserialize_index_config_update(&self) -> MetastoreResult; } impl UpdateIndexRequestExt for UpdateIndexRequest { - fn try_from_updates( + fn try_from_index_config_update( index_uid: impl Into, - search_settings: &SearchSettings, - retention_policy_opt: &Option, + update: &IndexConfigUpdate, ) -> MetastoreResult { - let search_settings_json = serde_utils::to_json_str(&search_settings)?; - let retention_policy_json = retention_policy_opt - .as_ref() - .map(serde_utils::to_json_str) - .transpose()?; - - let update_request = UpdateIndexRequest { - index_uid: Some(index_uid.into()), - search_settings_json, - retention_policy_json, - }; - Ok(update_request) - } - - fn deserialize_search_settings(&self) -> MetastoreResult { - serde_utils::from_json_str(&self.search_settings_json) + let index_uid = Some(index_uid.into()); + Ok(UpdateIndexRequest { + index_uid, + updated_config_json: serde_utils::to_json_str(update)?, + }) } - fn deserialize_retention_policy(&self) -> MetastoreResult> { - self.retention_policy_json - .as_ref() - .map(|policy| serde_utils::from_json_str(policy)) - .transpose() + fn deserialize_index_config_update(&self) -> MetastoreResult { + serde_utils::from_json_str(&self.updated_config_json) } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 9202c24cbe..ed3d9fd2d3 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -25,8 +25,8 @@ use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; use quickwit_config::{ - validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, - INGEST_V2_SOURCE_ID, + validate_index_id_pattern, IndexConfigUpdate, IndexTemplate, IndexTemplateId, + PostgresMetastoreConfig, INGEST_V2_SOURCE_ID, }; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ @@ -406,20 +406,30 @@ impl MetastoreService for PostgresqlMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let retention_policy_opt = request.deserialize_retention_policy()?; - let search_settings = request.deserialize_search_settings()?; + let update = request.deserialize_index_config_update()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_metadata = run_with_tx!(self.connection_pool, tx, { - mutate_index_metadata::(tx, index_uid, |index_metadata| { - if index_metadata.index_config.search_settings != search_settings - || index_metadata.index_config.retention_policy_opt != retention_policy_opt + mutate_index_metadata::(tx, index_uid, |index_metadata| match update + { + IndexConfigUpdate::SearchSettings(s) + if index_metadata.index_config.search_settings != s => { - index_metadata.index_config.search_settings = search_settings; - index_metadata.index_config.retention_policy_opt = retention_policy_opt; + index_metadata.index_config.search_settings = s; + Ok(MutationOccurred::Yes(())) + } + IndexConfigUpdate::IndexingSettings(s) + if index_metadata.index_config.indexing_settings != s => + { + index_metadata.index_config.indexing_settings = s; + Ok(MutationOccurred::Yes(())) + } + IndexConfigUpdate::RetentionPolicy(s) + if index_metadata.index_config.retention_policy_opt != s => + { + index_metadata.index_config.retention_policy_opt = s; Ok(MutationOccurred::Yes(())) - } else { - Ok(MutationOccurred::No(())) } + _ => Ok(MutationOccurred::No(())), }) .await })?; diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index a75ff11529..1245e9b896 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,13 +25,12 @@ // - list_indexes // - delete_index -use std::collections::BTreeSet; - use quickwit_common::rand::append_random_suffix; +use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; use quickwit_config::{ - IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, + IndexConfig, IndexConfigUpdate, IndexingSettings, RetentionPolicy, SearchSettings, + SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, }; -use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, @@ -84,9 +83,9 @@ pub async fn test_metastore_create_index< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_update_index< +async fn setup_metastore_for_update< MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, ->() { +>() -> (MetastoreToTest, IndexUid) { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-update-index"); @@ -101,51 +100,28 @@ pub async fn test_metastore_update_index< .index_uid() .clone(); - let index_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) - .await - .unwrap() - .deserialize_index_metadata() - .unwrap(); - - // use all fields that are currently not set as default - let current_defaults = BTreeSet::from_iter( - index_metadata - .index_config - .search_settings - .default_search_fields, - ); - let new_search_setting = SearchSettings { - default_search_fields: index_metadata - .index_config - .doc_mapping - .field_mappings - .iter() - .filter(|f| matches!(f.mapping_type, FieldMappingType::Text(..))) - .filter(|f| !current_defaults.contains(&f.name)) - .map(|f| f.name.clone()) - .collect(), - }; + (metastore, index_uid) +} +pub async fn test_metastore_update_retention_policy< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), evaluation_schedule: String::from("daily"), }); - assert_ne!( - index_metadata.index_config.retention_policy_opt, new_retention_policy_opt, - "original and updated value are the same, test became inefficient" - ); - // run same update twice to check idempotence, then None as a corner case check + // set and unset retention policy multiple times for loop_retention_policy_opt in [ + None, new_retention_policy_opt.clone(), new_retention_policy_opt.clone(), None, ] { - let index_update = UpdateIndexRequest::try_from_updates( + let index_update = UpdateIndexRequest::try_from_index_config_update( index_uid.clone(), - &new_search_setting, - &loop_retention_policy_opt, + &IndexConfigUpdate::RetentionPolicy(loop_retention_policy_opt.clone()), ) .unwrap(); let response_metadata = metastore @@ -155,23 +131,118 @@ pub async fn test_metastore_update_index< .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata.index_uid, index_uid); - assert_eq!( - response_metadata.index_config.search_settings, - new_search_setting - ); assert_eq!( response_metadata.index_config.retention_policy_opt, loop_retention_policy_opt ); let updated_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) .await .unwrap() .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata, updated_metadata); } + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_update_search_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; + + for loop_search_settings in [ + vec![], + vec!["body".to_owned()], + vec!["body".to_owned()], + vec!["body".to_owned(), "owner".to_owned()], + vec![], + ] { + let index_update = UpdateIndexRequest::try_from_index_config_update( + index_uid.clone(), + &IndexConfigUpdate::SearchSettings(SearchSettings { + default_search_fields: loop_search_settings.clone(), + }), + ) + .unwrap(); + let response_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + response_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + } + cleanup_index(&mut metastore, index_uid).await; +} +pub async fn test_metastore_update_indexing_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid) = setup_metastore_for_update::().await; + + for loop_indexing_settings in [ + MergePolicyConfig::Nop, + MergePolicyConfig::Nop, + MergePolicyConfig::StableLog(StableLogMergePolicyConfig { + merge_factor: 5, + ..Default::default() + }), + ] { + let index_update = UpdateIndexRequest::try_from_index_config_update( + index_uid.clone(), + &IndexConfigUpdate::IndexingSettings(IndexingSettings { + merge_policy: loop_indexing_settings.clone(), + ..Default::default() + }), + ) + .unwrap(); + let resp_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + resp_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + } cleanup_index(&mut metastore, index_uid).await; } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3caf221a56..61fd9bd8ab 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -187,9 +187,21 @@ macro_rules! metastore_test_suite { } #[tokio::test] - async fn test_metastore_update_index() { + async fn test_metastore_update_retention_policy() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + $crate::tests::index::test_metastore_update_retention_policy::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_search_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_search_settings::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_indexing_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_indexing_settings::<$metastore_type>().await; } #[tokio::test] diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 9c3d7e9a6a..2b15631771 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -202,10 +202,16 @@ message CreateIndexResponse { string index_metadata_json = 2; } +enum UpdatedIndexConfig { + SEARCH_SETTINGS = 0; + INDEXING_SETTINGS = 1; + RETENTION_POLICY = 2; +} + message UpdateIndexRequest { quickwit.common.IndexUid index_uid = 1; - string search_settings_json = 2; - optional string retention_policy_json = 3; + // JSON representation of the updated config field. + string updated_config_json = 2; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 2d27a262d2..7a72d62c93 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -26,10 +26,9 @@ pub struct CreateIndexResponse { pub struct UpdateIndexRequest { #[prost(message, optional, tag = "1")] pub index_uid: ::core::option::Option, + /// JSON representation of the updated config field. #[prost(string, tag = "2")] - pub search_settings_json: ::prost::alloc::string::String, - #[prost(string, optional, tag = "3")] - pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, + pub updated_config_json: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -509,6 +508,37 @@ impl SourceType { } } } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum UpdatedIndexConfig { + SearchSettings = 0, + IndexingSettings = 1, + RetentionPolicy = 2, +} +impl UpdatedIndexConfig { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + UpdatedIndexConfig::SearchSettings => "SEARCH_SETTINGS", + UpdatedIndexConfig::IndexingSettings => "INDEXING_SETTINGS", + UpdatedIndexConfig::RetentionPolicy => "RETENTION_POLICY", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SEARCH_SETTINGS" => Some(Self::SearchSettings), + "INDEXING_SETTINGS" => Some(Self::IndexingSettings), + "RETENTION_POLICY" => Some(Self::RetentionPolicy), + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index e90267482a..971e1e0c54 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,9 +26,7 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ - IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, -}; +use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -326,6 +324,12 @@ pub struct IndexClient<'a> { timeout: Timeout, } +pub enum UpdateConfigField { + RetentionPolicy, + SearchSettings, + IndexingSettings, +} + impl<'a> IndexClient<'a> { fn new(transport: &'a Transport, timeout: Timeout) -> Self { Self { transport, timeout } @@ -357,13 +361,37 @@ impl<'a> IndexClient<'a> { pub async fn update( &self, index_id: &str, - index_updates: IndexUpdates, + config_field: UpdateConfigField, + config: Bytes, + config_format: ConfigFormat, ) -> Result { - let body = Bytes::from(serde_json::to_string(&index_updates)?); - let path = format!("indexes/{index_id}"); + let header_map = header_from_config_format(config_format); + let path = match config_field { + UpdateConfigField::IndexingSettings => "indexing-settings", + UpdateConfigField::SearchSettings => "search-settings", + UpdateConfigField::RetentionPolicy => "retention-policy", + }; + let path = format!("indexes/{index_id}/{path}"); let response = self .transport - .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .send::<()>( + Method::PUT, + &path, + Some(header_map), + None, + Some(config), + self.timeout, + ) + .await?; + let index_metadata = response.deserialize().await?; + Ok(index_metadata) + } + + pub async fn delete_retention_policy(&self, index_id: &str) -> Result { + let path = format!("indexes/{index_id}/retention-policy"); + let response = self + .transport + .send::<()>(Method::DELETE, &path, None, None, None, self.timeout) .await?; let index_metadata = response.deserialize().await?; Ok(index_metadata) diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index ab831526c8..9b0990ce1f 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,5 +20,5 @@ mod rest_handler; pub use self::rest_handler::{ - index_management_handlers, IndexApi, IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, + index_management_handlers, IndexApi, ListSplitsQueryParams, ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index a64ff91854..82ee98828e 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -22,9 +22,8 @@ use std::sync::Arc; use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ - load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, - INGEST_API_SOURCE_ID, + load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, + IndexConfigUpdate, NodeConfig, SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; @@ -55,7 +54,10 @@ use crate::with_arg; #[openapi( paths( create_index, - update_index, + update_index_indexing_settings, + update_index_search_settings, + update_index_retention_policy, + delete_index_retention_policy, clear_index, delete_index, list_indexes_metadata, @@ -67,7 +69,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) )] pub struct IndexApi; @@ -90,7 +92,10 @@ pub fn index_management_handlers( get_index_metadata_handler(index_service.metastore()) .or(list_indexes_metadata_handler(index_service.metastore())) .or(create_index_handler(index_service.clone(), node_config)) - .or(update_index_handler(index_service.metastore())) + .or(update_search_settings_handler(index_service.metastore())) + .or(update_indexing_setting_handler(index_service.metastore())) + .or(update_retention_policy_handler(index_service.metastore())) + .or(delete_retention_policy_handler(index_service.metastore())) .or(clear_index_handler(index_service.clone())) .or(delete_index_handler(index_service.clone())) // Splits handlers @@ -526,25 +531,45 @@ async fn create_index( .await } -/// The body of the index update request. All fields will be replaced in the -/// existing configuration. -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility -pub struct IndexUpdates { - pub search_settings: SearchSettings, - #[serde(rename = "retention_policy")] - pub retention_policy_opt: Option, +pub fn pre_update_filter( + metastore: MetastoreServiceClient, + path: &'static str, +) -> impl Filter + + Clone { + warp::path("indexes") + .and(warp::path::param()) + .and(warp::path(path)) + .and(warp::path::end()) + .and(warp::put()) + .and(extract_config_format()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::filters::body::bytes()) + .and(with_arg(metastore)) } -fn update_index_handler( +async fn update_index( + index_id: String, + request: IndexConfigUpdate, + mut metastore: MetastoreServiceClient, +) -> Result { + info!(index_id = %index_id, "update-index"); + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_uid: IndexUid = metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()? + .index_uid; + let update_request = UpdateIndexRequest::try_from_index_config_update(index_uid, &request)?; + let update_resp = metastore.update_index(update_request).await?; + Ok(update_resp.deserialize_index_metadata()?) +} + +pub fn update_indexing_setting_handler( metastore: MetastoreServiceClient, ) -> impl Filter + Clone { - warp::path!("indexes" / String) - .and(warp::put()) - .and(json_body()) - .and(with_arg(metastore)) - .then(update_index) - .map(log_failure("failed to update index")) + pre_update_filter(metastore, "indexing-settings") + .then(update_index_indexing_settings) + .map(log_failure("failed to update indexing settings")) .and(extract_format_from_qs()) .map(into_rest_api_response) } @@ -552,40 +577,148 @@ fn update_index_handler( #[utoipa::path( put, tag = "Indexes", - path = "/indexes/{index_id}", - request_body = IndexUpdates, + path = "/indexes/{index_id}/indexing-settings", + request_body = IndexingSettings, responses( - (status = 200, description = "Successfully updated the index configuration.") + (status = 200, description = "Successfully updated the indexing settings.") ), params( ("index_id" = String, Path, description = "The index ID to update."), ) )] -/// Updates an existing index. -/// -/// This endpoint has PUT semantics, which means that all the updatable fields of the index -/// configuration are replaced by the values specified in the request. In particular, omitting an -/// optional field like `retention_policy` will delete the associated configuration. -async fn update_index( +/// The update is not automatically picked up by the indexer nodes, they need to be manually +/// restarted. +pub async fn update_index_indexing_settings( index_id: String, - request: IndexUpdates, - mut metastore: MetastoreServiceClient, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, ) -> Result { - info!(index_id = %index_id, "update-index"); - let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); - let index_uid: IndexUid = metastore - .index_metadata(index_metadata_request) - .await? - .deserialize_index_metadata()? - .index_uid; + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index( + index_id, + IndexConfigUpdate::IndexingSettings(update), + metastore, + ) + .await +} - let update_request = UpdateIndexRequest::try_from_updates( - index_uid, - &request.search_settings, - &request.retention_policy_opt, - )?; - let update_resp = metastore.update_index(update_request).await?; - Ok(update_resp.deserialize_index_metadata()?) +pub fn update_search_settings_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + pre_update_filter(metastore, "search-settings") + .then(update_index_search_settings) + .map(log_failure("failed to update search settings")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}/search-settings", + request_body = SearchSettings, + responses( + (status = 200, description = "Successfully updated the search settings.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +/// The update is automatically picked up when the next query is executed. +pub async fn update_index_search_settings( + index_id: String, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, +) -> Result { + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index( + index_id, + IndexConfigUpdate::SearchSettings(update), + metastore, + ) + .await +} + +pub fn update_retention_policy_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + pre_update_filter(metastore, "retention-policy") + .then(update_index_retention_policy) + .map(log_failure("failed to update retention policy")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + put, + tag = "Indexes", + path = "/indexes/{index_id}/retention-policy", + request_body = RetentionPolicy, + responses( + (status = 200, description = "Successfully updated the retention policy.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +/// The update is automatically picked up by the janitor service on its next state refresh. +pub async fn update_index_retention_policy( + index_id: String, + config_format: ConfigFormat, + config_bytes: Bytes, + metastore: MetastoreServiceClient, +) -> Result { + let update = config_format + .parse(&config_bytes) + .map_err(IndexServiceError::InvalidConfig)?; + update_index( + index_id, + IndexConfigUpdate::RetentionPolicy(Some(update)), + metastore, + ) + .await +} + +pub fn delete_retention_policy_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("indexes" / String / "retention-policy") + .and(warp::delete()) + .and(with_arg(metastore)) + .then(delete_index_retention_policy) + .map(log_failure("failed to delete index retention policy")) + .and(extract_format_from_qs()) + .map(into_rest_api_response) +} + +#[utoipa::path( + delete, + tag = "Indexes", + path = "/indexes/{index_id}/retention-policy", + responses( + (status = 200, description = "Successfully deleted the retention policy.") + ), + params( + ("index_id" = String, Path, description = "The index ID to update."), + ) +)] +/// The deletion is automatically picked up by the janitor service on its next state refresh. +pub async fn delete_index_retention_policy( + index_id: String, + metastore: MetastoreServiceClient, +) -> Result { + update_index( + index_id, + IndexConfigUpdate::RetentionPolicy(None), + metastore, + ) + .await } fn clear_index_handler( @@ -1001,6 +1134,7 @@ mod tests { use assert_json_diff::assert_json_include; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; + use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::{SourceParams, VecSourceParams}; use quickwit_indexing::{mock_split, MockSplitBuilder}; use quickwit_metastore::{metastore_for_test, IndexMetadata, ListSplitsResponseExt}; @@ -1791,7 +1925,7 @@ mod tests { .path("/indexes") .method("POST") .json(&true) - .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["body"]}}"#) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"timestamp_field":"timestamp", "field_mappings":[{"name": "timestamp", "type": "datetime", "fast": true}]},"search_settings":{"default_search_fields":["body"]}}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); @@ -1807,10 +1941,10 @@ mod tests { } { let resp = warp::test::request() - .path("/indexes/hdfs-logs") + .path("/indexes/hdfs-logs/search-settings") .method("PUT") .json(&true) - .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .body(r#"{"default_search_fields":["severity_text","body"]}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); @@ -1824,6 +1958,51 @@ mod tests { }); assert_json_include!(actual: resp_json, expected: expected_response_json); } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs/retention-policy") + .method("PUT") + .header("content-type", "application/yaml") + .body("period: 90 days") + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "retention": { + "period": "90 days" + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + { + let resp = warp::test::request() + .path("/indexes/hdfs-logs/indexing-settings") + .method("PUT") + .header("content-type", "application/toml") + .body( + r#" + [merge_policy] + type = "limit_merge" + "#, + ) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "index_config": { + "indexing_settings": { + "merge_policy": { + "type": "limit_merge" + } + } + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } // check that the metastore was updated let index_metadata = metastore .index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string())) @@ -1838,6 +2017,18 @@ mod tests { .default_search_fields, ["severity_text", "body"] ); + assert_eq!( + index_metadata + .index_config + .retention_policy_opt + .unwrap() + .retention_period, + "90 days" + ); + assert!(matches!( + index_metadata.index_config.indexing_settings.merge_policy, + MergePolicyConfig::ConstWriteAmplification(_) + )); } #[tokio::test] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5cd5b487a9..c14d6885cf 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -117,7 +117,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)]