From f5c6bd0e84deb498018d94a7b7be2f392f459ffa Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 18 Mar 2024 16:52:15 +0900 Subject: [PATCH 1/3] Changed the default of `default_merge_concurrency` to `2 * num_cpus / 3` We have a observed at least one case (rather extreme however) where the default settings did not make it possible for merging to keep up with indexing. --- quickwit/quickwit-config/src/node_config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 839d760ade1..12473933c7e 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -139,7 +139,7 @@ impl IndexerConfig { } pub fn default_merge_concurrency() -> NonZeroUsize { - NonZeroUsize::new(num_cpus::get() / 2).unwrap_or(NonZeroUsize::new(1).unwrap()) + NonZeroUsize::new(num_cpus::get() * 2 / 3).unwrap_or(NonZeroUsize::new(1).unwrap()) } fn default_cpu_capacity() -> CpuCapacity { From c8f8a0989cc18d55ec9ce49ced9dc8b5f099b3c5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 18 Mar 2024 17:12:23 +0900 Subject: [PATCH 2/3] Added documentation for indexer's cpu_capacity Closes #4716 --- docs/configuration/node-config.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index ec8f50c7b5a..0e6e2e4d739 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -166,7 +166,9 @@ This section contains the configuration options for an indexer. The split store | `split_store_max_num_bytes` | Maximum size in bytes allowed in the split store for each index-source pair. | `100G` | | `split_store_max_num_splits` | Maximum number of files allowed in the split store for each index-source pair. | `1000` | | `max_concurrent_split_uploads` | Maximum number of concurrent split uploads allowed on the node. | `12` | +| `merge_concurrency` | Maximum number of merge operations that can be executed on the node at one point in time. | `(2 x num threads available) / 3` | | `enable_otlp_endpoint` | If true, enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry Protocol (OTLP). | `false` | +| `cpu_capacity` | Advisory parameter used by the control plane. The value can expressed be in threads (e.g. `2`) or in term of millicpus (`2000m`). The control plane will attempt to schedule indexing pipelines on the different nodes proportionally to the cpu capacity advertised by the indexer. It is NOT used as a limit. All pipelines will be scheduled regardless of whether the cluster has sufficient capacity or not. The control plane does not attempt to spread the work equally when the load is well below the `cpu_capacity`. Users who need a balanced load on all of their indexer nodes can set the `cpu_capacity` to an arbitrarily low value as long as they keep it proportional to the number of threads available. | `num threads available` | Example: From 59db2b728b86e3c84db59129dc60825a40663b7b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 18 Mar 2024 17:50:07 +0900 Subject: [PATCH 3/3] This PR deprecates the max_num_pipeline_per_index in Kafka's source config. It also renames desired_num_pipelines into num_pipelines. Closes #4624 --- config/tutorials/gh-archive/kafka-source.yaml | 5 +- docs/configuration/source-config.md | 40 +----- docs/ingest-data/kafka.md | 5 +- quickwit/quickwit-cli/src/source.rs | 9 +- quickwit/quickwit-cli/src/tool.rs | 6 +- .../tests/source_config/kafka-source.json | 1 - quickwit/quickwit-config/src/lib.rs | 3 +- .../quickwit-config/src/source_config/mod.rs | 120 ++++++++---------- .../src/source_config/serialize.rs | 79 +++++++++--- .../src/indexing_scheduler/mod.rs | 25 ++-- quickwit/quickwit-control-plane/src/tests.rs | 14 +- .../src/actors/indexing_pipeline.rs | 12 +- .../src/actors/indexing_service.rs | 18 +-- .../src/source/file_source.rs | 9 +- .../src/source/gcp_pubsub_source.rs | 3 +- .../src/source/ingest_api_source.rs | 3 +- .../src/source/kafka_source.rs | 3 +- quickwit/quickwit-indexing/src/source/mod.rs | 12 +- .../src/source/pulsar_source.rs | 3 +- .../src/source/source_factory.rs | 3 +- .../src/source/vec_source.rs | 6 +- .../src/source/void_source.rs | 6 +- quickwit/quickwit-indexing/src/test_utils.rs | 3 +- .../quickwit-lambda/src/indexer/ingest.rs | 3 +- .../quickwit-metastore/src/tests/source.rs | 12 +- .../file-backed-index/v0.6.expected.json | 5 +- .../file-backed-index/v0.7.expected.json | 13 +- .../index-metadata/v0.4.expected.json | 5 +- .../index-metadata/v0.5.expected.json | 5 +- .../index-metadata/v0.6.expected.json | 5 +- .../index-metadata/v0.7.expected.json | 5 +- .../src/index_api/rest_handler.rs | 4 +- 32 files changed, 198 insertions(+), 247 deletions(-) diff --git a/config/tutorials/gh-archive/kafka-source.yaml b/config/tutorials/gh-archive/kafka-source.yaml index 3b0bd54b1eb..e4670538a29 100644 --- a/config/tutorials/gh-archive/kafka-source.yaml +++ b/config/tutorials/gh-archive/kafka-source.yaml @@ -1,8 +1,7 @@ -version: 0.7 +version: 0.8 source_id: kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: gh-archive client_params: diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md index 04e0369f8c9..b588c4315fe 100644 --- a/docs/configuration/source-config.md +++ b/docs/configuration/source-config.md @@ -80,11 +80,10 @@ Short max poll interval durations may cause a source to crash when back pressure ```bash cat << EOF > source-config.yaml -version: 0.7 +version: 0.8 source_id: my-kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: my-topic client_params: @@ -164,38 +163,13 @@ EOF ./quickwit source create --index my-index --source-config source-config.yaml ``` -## Maximum number of pipelines per indexer - -The `max_num_pipelines_per_indexer` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar(coming soon). - -The maximum number of indexing pipelines defines the limit of pipelines spawned for the source on a given indexer. -This maximum can be reached only if there are enough `desired_num_pipelines` to run. - -:::note - -With the following parameters, only one pipeline will run on one indexer. - -- `max_num_pipelines_per_indexer=2` -- `desired_num_pipelines=1` - -::: - -## Desired number of pipelines - -`desired_num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon). - -The desired number of indexing pipelines defines the number of pipelines to run on a cluster for the source. It is a "desired" -number as it cannot be reach it there is not enough indexers in -the cluster. - -:::note - -With the following parameters, only one pipeline will start on the sole indexer. +## Number of pipelines -- `max_num_pipelines_per_indexer=1` -- `desired_num_pipelines=2` +`num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon). -::: +It defines the number of pipelines to run on a cluster for the source. The actual placement of these pipelines on the different indexer +will be decided by the control plane. Note that distributions of a source like Kafka is done by assigning a set of partitions to different pipelines. +As a result, it is recommended to make sure the number of partitions is a multiple of the number of `num_pipelines`. ## Transform parameters diff --git a/docs/ingest-data/kafka.md b/docs/ingest-data/kafka.md index a92a8968f1a..3f1a458afeb 100644 --- a/docs/ingest-data/kafka.md +++ b/docs/ingest-data/kafka.md @@ -101,11 +101,10 @@ This tutorial assumes that the Kafka cluster is available locally on the default # # Kafka source config file. # -version: 0.7 +version: 0.8 source_id: kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: gh-archive client_params: diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index aa444d6576f..7a12cfc0821 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -741,8 +741,7 @@ mod tests { .collect(); let sources = vec![SourceConfig { source_id: "foo-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("path/to/file"), transform_config: None, @@ -802,8 +801,7 @@ mod tests { let sources = [ SourceConfig { source_id: "foo-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::stdin(), transform_config: None, @@ -811,8 +809,7 @@ mod tests { }, SourceConfig { source_id: "bar-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::stdin(), transform_config: None, diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index d9826c2ea56..4ccc20f163c 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -422,8 +422,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< .map(|vrl_script| TransformConfig::new(vrl_script, None)); let source_config = SourceConfig { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), + num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, @@ -608,8 +607,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { index_id: args.index_id, source_config: SourceConfig { source_id: args.source_id, - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams::default()), transform_config: None, diff --git a/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json b/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json index 1b809299768..cd3380fe8db 100644 --- a/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json +++ b/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json @@ -2,7 +2,6 @@ "version": "0.7", "source_id": "hdfs-logs-kafka-source", "desired_num_pipelines": 2, - "max_num_pipelines_per_indexer": 2, "source_type": "kafka", "params": { "topic": "cloudera-cluster-logs", diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index ca003263686..1cb8119ee8e 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -72,7 +72,7 @@ pub use crate::node_config::{ enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, }; -use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig}; +use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, @@ -88,6 +88,7 @@ pub use crate::storage_config::{ DocMapping, VersionedSourceConfig, SourceConfigV0_7, + SourceConfigV0_8, VersionedIndexConfig, IndexConfigV0_7, VersionedIndexTemplate, diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index e077f112b36..2bec4be7935 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -55,33 +55,14 @@ pub const RESERVED_SOURCE_IDS: &[&str] = pub struct SourceConfig { pub source_id: String, - /// Maximum number of indexing pipelines spawned for the source on a given indexer. - /// The maximum is reached only if there is enough `desired_num_pipelines` to run. - /// The value is only used by sources that Quickwit knows how to distribute across - /// pipelines/nodes, that is for Kafka sources only. - /// Example: - /// - `max_num_pipelines_per_indexer=2` - /// - `desired_num_pipelines=1` - /// => Only one pipeline will run on one indexer. - pub max_num_pipelines_per_indexer: NonZeroUsize, - /// Number of desired indexing pipelines to run on a cluster for the source. - /// This number could not be reach if there is not enough indexers. - /// The value is only used by sources that Quickwit knows how to distribute across - /// pipelines/nodes, that is for Kafka sources only. - /// Example: - /// - `max_num_pipelines_per_indexer=1` - /// - `desired_num_pipelines=2` - /// - 1 indexer - /// => Only one pipeline will start on the sole indexer. - pub desired_num_pipelines: NonZeroUsize, + /// Number of indexing pipelines to run on a cluster for the source. + pub num_pipelines: NonZeroUsize, // Denotes if this source is enabled. pub enabled: bool, pub source_params: SourceParams, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "transform")] pub transform_config: Option, // Denotes the input data format. @@ -126,8 +107,7 @@ impl SourceConfig { pub fn cli() -> Self { Self { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestCli, transform_config: None, @@ -139,8 +119,7 @@ impl SourceConfig { pub fn ingest_v2() -> Self { Self { source_id: INGEST_V2_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: enable_ingest_v2(), source_params: SourceParams::Ingest, transform_config: None, @@ -152,8 +131,7 @@ impl SourceConfig { pub fn ingest_api_default() -> Self { Self { source_id: INGEST_API_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestApi, transform_config: None, @@ -165,8 +143,7 @@ impl SourceConfig { pub fn for_test(source_id: &str, source_params: SourceParams) -> Self { Self { source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params, transform_config: None, @@ -179,8 +156,7 @@ impl TestableForRegression for SourceConfig { fn sample_for_regression() -> Self { SourceConfig { source_id: "kafka-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: "kafka-topic".to_string(), @@ -572,8 +548,7 @@ mod tests { load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap(); let expected_source_config = SourceConfig { source_id: "hdfs-logs-kafka-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: "cloudera-cluster-logs".to_string(), @@ -588,7 +563,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 2); + assert_eq!(source_config.num_pipelines.get(), 2); } #[test] @@ -669,8 +644,7 @@ mod tests { load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap(); let expected_source_config = SourceConfig { source_id: "hdfs-logs-kinesis-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::Kinesis(KinesisSourceParams { stream_name: "emr-cluster-logs".to_string(), @@ -684,7 +658,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 1); + assert_eq!(source_config.num_pipelines.get(), 1); } #[tokio::test] @@ -706,30 +680,29 @@ mod tests { .to_string() .contains("`desired_num_pipelines` must be")); } + // { + // let content = r#" + // { + // "version": "0.7", + // "source_id": "hdfs-logs-void-source", + // "desired_num_pipelines": 1, + // "max_num_pipelines_per_indexer": 0, + // "source_type": "void", + // "params": {} + // } + // "#; + // let error = load_source_config_from_user_config(ConfigFormat::Json, + // content.as_bytes()) .unwrap_err(); + // assert!(error + // .to_string() + // .contains("`max_num_pipelines_per_indexer` must be")); + // } { let content = r#" { - "version": "0.7", - "source_id": "hdfs-logs-void-source", - "desired_num_pipelines": 1, - "max_num_pipelines_per_indexer": 0, - "source_type": "void", - "params": {} - } - "#; - let error = load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) - .unwrap_err(); - assert!(error - .to_string() - .contains("`max_num_pipelines_per_indexer` must be")); - } - { - let content = r#" - { - "version": "0.7", + "version": "0.8", "source_id": "hdfs-logs-void-source", - "desired_num_pipelines": 1, - "max_num_pipelines_per_indexer": 2, + "num_pipelines": 2, "source_type": "void", "params": {} } @@ -756,7 +729,7 @@ mod tests { } #[tokio::test] - async fn test_load_valid_distributed_source_config() { + async fn test_load_valid_distributed_source_config_0_7() { { let content = r#" { @@ -773,8 +746,7 @@ mod tests { let source_config = load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) .unwrap(); - assert_eq!(source_config.desired_num_pipelines.get(), 3); - assert_eq!(source_config.max_num_pipelines_per_indexer.get(), 3); + assert_eq!(source_config.num_pipelines.get(), 3); } { let content = r#" @@ -793,11 +765,32 @@ mod tests { load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) .unwrap_err(); // TODO: uncomment asserts once distributed indexing is activated for pulsar. - // assert_eq!(source_config.desired_num_pipelines(), 3); + // assert_eq!(source_config.num_pipelines(), 3); // assert_eq!(source_config.max_num_pipelines_per_indexer(), 3); } } + #[tokio::test] + async fn test_load_valid_distributed_source_config() { + { + let content = r#" + { + "version": "0.8", + "source_id": "hdfs-logs-kafka-source", + "num_pipelines": 3, + "source_type": "kafka", + "params": { + "topic": "my-topic" + } + } + "#; + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); + } + } + #[test] fn test_file_source_params_serialization() { { @@ -1077,8 +1070,7 @@ mod tests { let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap(); let expected_source_config = SourceConfig { source_id: INGEST_API_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestApi, transform_config: Some(TransformConfig { @@ -1088,7 +1080,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 1); + assert_eq!(source_config.num_pipelines.get(), 1); } #[test] diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 3df558eb59d..3aacf5bf77f 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize}; use super::{TransformConfig, RESERVED_SOURCE_IDS}; use crate::{validate_identifier, ConfigFormat, SourceConfig, SourceInputFormat, SourceParams}; -type SourceConfigForSerialization = SourceConfigV0_7; +type SourceConfigForSerialization = SourceConfigV0_8; #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] @@ -37,12 +37,15 @@ pub enum VersionedSourceConfig { #[serde(alias = "0.5")] #[serde(alias = "0.4")] V0_7(SourceConfigV0_7), + #[serde(rename = "0.8")] + V0_8(SourceConfigV0_8), } impl From for SourceConfigForSerialization { fn from(versioned_source_config: VersionedSourceConfig) -> Self { match versioned_source_config { - VersionedSourceConfig::V0_7(v0_6) => v0_6, + VersionedSourceConfig::V0_7(v0_7) => v0_7.into(), + VersionedSourceConfig::V0_8(v0_8) => v0_8, } } } @@ -73,12 +76,8 @@ impl SourceConfigForSerialization { if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) { validate_identifier("Source ID", &self.source_id)?; } - let desired_num_pipelines = NonZeroUsize::new(self.desired_num_pipelines) + let num_pipelines = NonZeroUsize::new(self.num_pipelines) .ok_or_else(|| anyhow::anyhow!("`desired_num_pipelines` must be strictly positive"))?; - let max_num_pipelines_per_indexer = NonZeroUsize::new(self.max_num_pipelines_per_indexer) - .ok_or_else(|| { - anyhow::anyhow!("`max_num_pipelines_per_indexer` must be strictly positive") - })?; match &self.source_params { // We want to forbid source_config with no filepath SourceParams::File(file_params) => { @@ -102,7 +101,7 @@ impl SourceConfigForSerialization { match &self.source_params { SourceParams::PubSub(_) | SourceParams::Kafka(_) => {} _ => { - if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 { + if self.num_pipelines > 1 { bail!("Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types"); } } @@ -120,8 +119,7 @@ impl SourceConfigForSerialization { Ok(SourceConfig { source_id: self.source_id, - max_num_pipelines_per_indexer, - desired_num_pipelines, + num_pipelines, enabled: self.enabled, source_params: self.source_params, transform_config: self.transform, @@ -130,12 +128,11 @@ impl SourceConfigForSerialization { } } -impl From for SourceConfigV0_7 { +impl From for SourceConfigV0_8 { fn from(source_config: SourceConfig) -> Self { - SourceConfigV0_7 { + SourceConfigV0_8 { source_id: source_config.source_id, - max_num_pipelines_per_indexer: source_config.max_num_pipelines_per_indexer.get(), - desired_num_pipelines: source_config.desired_num_pipelines.get(), + num_pipelines: source_config.num_pipelines.get(), enabled: source_config.enabled, source_params: source_config.source_params, transform: source_config.transform_config, @@ -146,7 +143,7 @@ impl From for SourceConfigV0_7 { impl From for VersionedSourceConfig { fn from(source_config: SourceConfig) -> Self { - VersionedSourceConfig::V0_7(source_config.into()) + VersionedSourceConfig::V0_8(source_config.into()) } } @@ -154,7 +151,7 @@ impl TryFrom for SourceConfig { type Error = anyhow::Error; fn try_from(versioned_source_config: VersionedSourceConfig) -> anyhow::Result { - let v1: SourceConfigV0_7 = versioned_source_config.into(); + let v1: SourceConfigV0_8 = versioned_source_config.into(); v1.validate_and_build() } } @@ -163,7 +160,7 @@ fn default_max_num_pipelines_per_indexer() -> usize { 1 } -fn default_desired_num_pipelines() -> usize { +fn default_num_pipelines() -> usize { 1 } @@ -172,6 +169,7 @@ fn default_source_enabled() -> bool { } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] pub struct SourceConfigV0_7 { pub source_id: String, @@ -181,7 +179,7 @@ pub struct SourceConfigV0_7 { )] pub max_num_pipelines_per_indexer: usize, - #[serde(default = "default_desired_num_pipelines")] + #[serde(default = "default_num_pipelines")] pub desired_num_pipelines: usize, // Denotes if this source is enabled. @@ -198,3 +196,48 @@ pub struct SourceConfigV0_7 { #[serde(default)] pub input_format: SourceInputFormat, } + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct SourceConfigV0_8 { + pub source_id: String, + + #[serde(default = "default_num_pipelines")] + pub num_pipelines: usize, + + // Denotes if this source is enabled. + #[serde(default = "default_source_enabled")] + pub enabled: bool, + + #[serde(flatten)] + pub source_params: SourceParams, + + #[serde(skip_serializing_if = "Option::is_none")] + pub transform: Option, + + // Denotes the input data format. + #[serde(default)] + pub input_format: SourceInputFormat, +} + +impl From for SourceConfigV0_8 { + fn from(source_config_v0_7: SourceConfigV0_7) -> Self { + let SourceConfigV0_7 { + source_id, + max_num_pipelines_per_indexer: _, + desired_num_pipelines, + enabled, + source_params, + transform, + input_format, + } = source_config_v0_7; + SourceConfigV0_8 { + source_id, + num_pipelines: desired_num_pipelines, + enabled, + source_params, + transform, + input_format, + } + } +} diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 0f7539009e5..be8379258a7 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -172,7 +172,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::NonSharded { - num_pipelines: source_config.desired_num_pipelines.get() as u32, + num_pipelines: source_config.num_pipelines.get() as u32, // FIXME load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) .unwrap(), @@ -681,8 +681,7 @@ mod tests { &index_uid, SourceConfig { source_id: "source_disabled".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(3).unwrap(), - desired_num_pipelines: NonZeroUsize::new(3).unwrap(), + num_pipelines: NonZeroUsize::new(3).unwrap(), enabled: false, source_params: SourceParams::Kafka(kafka_source_params.clone()), transform_config: None, @@ -695,8 +694,7 @@ mod tests { &index_uid, SourceConfig { source_id: "source_enabled".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(kafka_source_params.clone()), transform_config: None, @@ -709,8 +707,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v1".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v1 source_params: SourceParams::IngestApi, @@ -724,8 +721,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v2".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v2 source_params: SourceParams::Ingest, @@ -740,8 +736,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v2_without_shard".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v2 source_params: SourceParams::Ingest, @@ -755,8 +750,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_cli".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v1 source_params: SourceParams::IngestCli, @@ -857,13 +851,12 @@ mod tests { prop_compose! { fn gen_kafka_source() - (index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (IndexUid, SourceConfig) { + (index_idx in 0usize..100usize, num_pipelines in 1usize..51usize) -> (IndexUid, SourceConfig) { let index_uid = IndexUid::from_parts(&format!("index-id-{index_idx}"), 0 /* this is the index uid */); let source_id = quickwit_common::rand::append_random_suffix("kafka-source"); (index_uid, SourceConfig { source_id, - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), + num_pipelines: NonZeroUsize::new(num_pipelines).unwrap(), enabled: true, source_params: kafka_source_params_for_test(), transform_config: None, diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 830bb72c711..0375774d765 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -43,18 +43,12 @@ use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL}; use crate::indexing_scheduler::MIN_DURATION_BETWEEN_SCHEDULING; use crate::IndexerNodeInfo; -fn index_metadata_for_test( - index_id: &str, - source_id: &str, - desired_num_pipelines: usize, - max_num_pipelines_per_indexer: usize, -) -> IndexMetadata { +fn index_metadata_for_test(index_id: &str, source_id: &str, num_pipelines: usize) -> IndexMetadata { let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); let source_config = SourceConfig { enabled: true, source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), + num_pipelines: NonZeroUsize::new(num_pipelines).unwrap(), source_params: SourceParams::Kafka(KafkaSourceParams { topic: "topic".to_string(), client_log_level: None, @@ -115,8 +109,8 @@ async fn start_control_plane( let source_1 = "source-1"; let index_2 = "test-indexing-plan-2"; let source_2 = "source-2"; - let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); - let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); + let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2); + let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1); index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; let mut metastore = MetastoreServiceClient::mock(); metastore.expect_list_indexes_metadata().returning( diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ca3c911bb9e..23766d10ba7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -702,8 +702,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, @@ -810,8 +809,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, @@ -887,8 +885,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Void(VoidSourceParams), transform_config: None, @@ -1006,8 +1003,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index b9ca436dc3d..0675115b86b 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -964,8 +964,7 @@ mod tests { // Test `spawn_pipeline`. let source_config_0 = SourceConfig { source_id: "test-indexing-service--source-0".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1052,8 +1051,7 @@ mod tests { // Test `supervise_pipelines` let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams { docs: Vec::new(), @@ -1123,8 +1121,7 @@ mod tests { // Test `apply plan`. let source_config_1 = SourceConfig { source_id: "test-indexing-service--source-1".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1173,8 +1170,7 @@ mod tests { }; let source_config_2 = SourceConfig { source_id: "test-indexing-service--source-2".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(kafka_params), transform_config: None, @@ -1330,8 +1326,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1460,8 +1455,7 @@ mod tests { let mut index_metadata = IndexMetadata::for_test(&index_id, &index_uri); let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 2c25eec96f8..b84153aa355 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -268,8 +268,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, @@ -351,8 +350,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, @@ -460,8 +458,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index 70263b67739..bf0ccb72702 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -314,8 +314,7 @@ mod gcp_pubsub_emulator_tests { let source_id = append_random_suffix("test-gcp-pubsub-source--source"); SourceConfig { source_id, - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::PubSub(PubSubSourceParams { project_id: Some(GCP_TEST_PROJECT.to_string()), diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index ceb22c78cdf..6abdd9c4a73 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -283,8 +283,7 @@ mod tests { fn make_source_config() -> SourceConfig { SourceConfig { source_id: INGEST_API_SOURCE_ID.to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::IngestApi, transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index ff435041b1f..b1181da8b6f 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -897,8 +897,7 @@ mod kafka_broker_tests { let source_id = append_random_suffix("test-kafka-source--source"); let source_config = SourceConfig { source_id: source_id.clone(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: topic.to_string(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index b55ad9c8b99..819f6223dfc 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -549,8 +549,7 @@ mod tests { { let source_config = SourceConfig { source_id: "void".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -561,8 +560,7 @@ mod tests { { let source_config = SourceConfig { source_id: "vec".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams::default()), transform_config: None, @@ -573,8 +571,7 @@ mod tests { { let source_config = SourceConfig { source_id: "file".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("file-does-not-exist.json"), transform_config: None, @@ -589,8 +586,7 @@ mod tests { { let source_config = SourceConfig { source_id: "file".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("data/test_corpus.json"), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 6c2c6e1cfc5..0dc6f38b7fc 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -546,8 +546,7 @@ mod pulsar_broker_tests { let source_id = append_random_suffix("test-pulsar-source--source"); let source_config = SourceConfig { source_id: source_id.clone(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Pulsar(PulsarSourceParams { topics: topics.into_iter().map(|v| v.as_ref().to_string()).collect(), diff --git a/quickwit/quickwit-indexing/src/source/source_factory.rs b/quickwit/quickwit-indexing/src/source/source_factory.rs index 76b372a4158..4bc6a29ed31 100644 --- a/quickwit/quickwit-indexing/src/source/source_factory.rs +++ b/quickwit/quickwit-indexing/src/source/source_factory.rs @@ -134,8 +134,7 @@ mod tests { let source_loader = quickwit_supported_sources(); let source_config = SourceConfig { source_id: "test-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index 06b0b84883c..efbf3c13162 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -163,8 +163,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-vec-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(params.clone()), transform_config: None, @@ -224,8 +223,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-vec-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(params.clone()), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/void_source.rs b/quickwit/quickwit-indexing/src/source/void_source.rs index 7a8db8d9000..2166986b183 100644 --- a/quickwit/quickwit-indexing/src/source/void_source.rs +++ b/quickwit/quickwit-indexing/src/source/void_source.rs @@ -88,8 +88,7 @@ mod tests { async fn test_void_source_loading() { let source_config = SourceConfig { source_id: "test-void-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -114,8 +113,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let source_config = SourceConfig { source_id: "test-void-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 5953e1e8674..4dbe288fd9c 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -158,8 +158,7 @@ impl TestSandbox { let add_docs_id = self.add_docs_id.fetch_add(1, Ordering::SeqCst); let source_config = SourceConfig { source_id: self.index_uid.index_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams { docs, diff --git a/quickwit/quickwit-lambda/src/indexer/ingest.rs b/quickwit/quickwit-lambda/src/indexer/ingest.rs index b449c81b84f..9237b523777 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest.rs @@ -132,8 +132,7 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { .map(|vrl_script| TransformConfig::new(vrl_script, None)); let source_config = SourceConfig { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), + num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, diff --git a/quickwit/quickwit-metastore/src/tests/source.rs b/quickwit/quickwit-metastore/src/tests/source.rs index 30a42d8d8dc..ba46cbad37e 100644 --- a/quickwit/quickwit-metastore/src/tests/source.rs +++ b/quickwit/quickwit-metastore/src/tests/source.rs @@ -55,8 +55,7 @@ pub async fn test_metastore_add_source