diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 0d7d5f7c91a..23691362f9c 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -151,7 +151,7 @@ mod tests { use quickwit_cli::split::{DescribeSplitArgs, SplitCliCommand}; use quickwit_cli::tool::{ ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs, - ToolCliCommand, + ToolCliCommand, MergeArgs }; use quickwit_common::uri::Uri; use quickwit_config::SourceInputFormat; @@ -740,6 +740,31 @@ mod tests { Ok(()) } + #[test] + fn test_parse_merge_args() -> anyhow::Result<()> { + let app = build_cli().no_binary_name(true); + let matches = app.try_get_matches_from([ + "tool", + "merge", + "--index", + "wikipedia", + "--source", + "ingest-source", + "--config", + "/config.yaml", + ])?; + let command = CliCommand::parse_cli_args(matches)?; + assert!(matches!( + command, + CliCommand::Tool(ToolCliCommand::Merge(MergeArgs { + index_id, + source_id, + .. + })) if &index_id == "wikipedia" && source_id == "ingest-source" + )); + Ok(()) + } + #[test] fn test_parse_no_color() { // SAFETY: this test may not be entirely sound if not run with nextest or --test-threads=1 diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 557d61921e0..136be2812f5 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -12,6 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::info; +use quickwit_config::VecSourceParams; +use quickwit_actors::Mailbox; +use quickwit_actors::ActorExitStatus; +use quickwit_indexing::actors::MergePipeline; +use quickwit_indexing::actors::MergeSchedulerService; +use quickwit_indexing::models::DetachMergePipeline; +use quickwit_proto::types::SourceId; use std::collections::{HashSet, VecDeque}; use std::io::{IsTerminal, Stdout, Write, stdout}; use std::num::NonZeroUsize; @@ -200,6 +208,13 @@ pub struct GarbageCollectIndexArgs { pub dry_run: bool, } +#[derive(Debug, Eq, PartialEq)] +pub struct MergeArgs { + pub config_uri: Uri, + pub index_id: IndexId, + pub source_id: SourceId, +} + #[derive(Debug, Eq, PartialEq)] pub struct ExtractSplitArgs { pub config_uri: Uri, @@ -214,6 +229,7 @@ pub enum ToolCliCommand { LocalIngest(LocalIngestDocsArgs), LocalSearch(LocalSearchArgs), ExtractSplit(ExtractSplitArgs), + Merge(MergeArgs), } impl ToolCliCommand { @@ -225,6 +241,7 @@ impl ToolCliCommand { "gc" => Self::parse_garbage_collect_args(submatches), "local-ingest" => Self::parse_local_ingest_args(submatches), "local-search" => Self::parse_local_search_args(submatches), + "merge" => Self::parse_merge_args(submatches), "extract-split" => Self::parse_extract_split_args(submatches), _ => bail!("unknown tool subcommand `{subcommand}`"), } @@ -314,6 +331,24 @@ impl ToolCliCommand { })) } + fn parse_merge_args(mut matches: ArgMatches) -> anyhow::Result { + let config_uri = matches + .remove_one::("config") + .map(|uri_str| Uri::from_str(&uri_str)) + .expect("`config` should be a required arg.")?; + let index_id = matches + .remove_one::("index") + .expect("'index-id' should be a required arg."); + let source_id = matches + .remove_one::("source") + .expect("'source-id' should be a required arg."); + Ok(Self::Merge(MergeArgs { + index_id, + source_id, + config_uri, + })) + } + fn parse_garbage_collect_args(mut matches: ArgMatches) -> anyhow::Result { let config_uri = matches .get_one("config") @@ -363,6 +398,7 @@ impl ToolCliCommand { Self::GarbageCollect(args) => garbage_collect_index_cli(args).await, Self::LocalIngest(args) => local_ingest_docs_cli(args).await, Self::LocalSearch(args) => local_search_cli(args).await, + Self::Merge(args) => merge_cli(args).await, Self::ExtractSplit(args) => extract_split_cli(args).await, } } @@ -418,6 +454,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< &HashSet::from_iter([QuickwitService::Indexer]), )?; let universe = Universe::new(); + let merge_scheduler_service_mailbox = universe.get_or_spawn_one(); let split_cache = Arc::new(IndexingSplitCache::from_config(&indexer_config, &config.data_dir_path).await?); let indexing_server = IndexingService::new( @@ -428,6 +465,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< cluster, metastore, None, + Some(merge_scheduler_service_mailbox), IngesterPool::default(), storage_resolver, EventBroker::default(), @@ -443,6 +481,11 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< pipeline_uid: PipelineUid::random(), }) .await?; + let merge_pipeline_handle = indexing_server_mailbox + .ask_for_res(DetachMergePipeline { + pipeline_id: pipeline_id.merge_pipeline_id(), + }) + .await?; let indexing_pipeline_handle = indexing_server_mailbox .ask_for_res(DetachIndexingPipeline { pipeline_id }) .await?; @@ -460,6 +503,13 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none()) .await?; + + merge_pipeline_handle + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + merge_pipeline_handle.join().await; + // Shutdown the indexing server. universe .send_exit_with_success(&indexing_server_mailbox) @@ -527,6 +577,93 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { Ok(()) } +pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { + debug!(args=?args, "run-merge-operations"); + println!("❯ Merging splits locally..."); + let config = load_node_config(&args.config_uri).await?; + let (storage_resolver, metastore_resolver) = + get_resolvers(&config.storage_configs, &config.metastore_configs); + let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?; + run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?; + // The indexing service needs to update its cluster chitchat state so that the control plane is + // aware of the running tasks. We thus create a fake cluster to instantiate the indexing service + // and avoid impacting potential control plane running on the cluster. + let cluster = create_empty_cluster(&config).await?; + let runtimes_config = RuntimesConfig::default(); + start_actor_runtimes( + runtimes_config, + &HashSet::from_iter([QuickwitService::Indexer]), + )?; + let indexer_config = IndexerConfig::default(); + let universe = Universe::new(); + let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); + let indexing_server = IndexingService::new( + config.node_id, + config.data_dir_path, + indexer_config, + runtimes_config.num_threads_blocking, + cluster, + metastore, + None, + Some(merge_scheduler_service), + IngesterPool::default(), + storage_resolver, + EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), + ) + .await?; + let (indexing_service_mailbox, indexing_service_handle) = + universe.spawn_builder().spawn(indexing_server); + let pipeline_id = indexing_service_mailbox + .ask_for_res(SpawnPipeline { + index_id: args.index_id, + source_config: SourceConfig { + source_id: args.source_id, + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::Vec(VecSourceParams::default()), + transform_config: None, + input_format: SourceInputFormat::Json, + }, + pipeline_uid: PipelineUid::random(), + }) + .await?; + let pipeline_handle: ActorHandle = indexing_service_mailbox + .ask_for_res(DetachMergePipeline { + pipeline_id: pipeline_id.merge_pipeline_id(), + }) + .await?; + + let mut check_interval = tokio::time::interval(Duration::from_secs(1)); + loop { + check_interval.tick().await; + + pipeline_handle.refresh_observe(); + let observation = pipeline_handle.last_observation(); + + if observation.num_ongoing_merges == 0 { + info!("merge pipeline has no more ongoing merges, exiting"); + break; + } + + if pipeline_handle.state().is_exit() { + info!("merge pipeline has exited, exiting"); + break; + } + } + + let (pipeline_exit_status, _pipeline_statistics) = pipeline_handle.quit().await; + indexing_service_handle.quit().await; + if !matches!( + pipeline_exit_status, + ActorExitStatus::Success | ActorExitStatus::Quit + ) { + bail!(pipeline_exit_status); + } + println!("{} Merge successful.", "✔".color(GREEN_COLOR)); + Ok(()) +} + pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> { debug!(args=?args, "garbage-collect-index"); println!("❯ Garbage collecting index..."); diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index af1f3bab115..f83d596758a 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -18,11 +18,13 @@ use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext}; -use quickwit_common::io::Limiter; +use quickwit_common::io::{self, Limiter}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::uri::Uri; -use quickwit_config::{IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper}; +use quickwit_config::{ + CompactorConfig, IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper, +}; use quickwit_doc_mapper::DocMapping; use quickwit_indexing::merge_policy::{MergeOperation, merge_policy_from_settings}; use quickwit_indexing::{IndexingSplitCache, IndexingSplitStore}; @@ -75,18 +77,25 @@ impl CompactorSupervisor { pub fn new( node_id: NodeId, planner_client: CompactionPlannerServiceClient, - max_concurrent_merge_executions: usize, - io_throughput_limiter: Option, + compactor_config: &CompactorConfig, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, split_cache: Arc, - max_concurrent_split_uploads: usize, event_broker: EventBroker, compaction_root_directory: TempDirectory, ) -> Self { - let num_pipeline_slots = max_concurrent_merge_executions * 2; + let &CompactorConfig { + max_concurrent_merge_executions, + pipeline_slots_per_merge_execution, + max_concurrent_split_uploads, + max_merge_write_throughput, + } = compactor_config; + let num_pipeline_slots = + max_concurrent_merge_executions.get() * pipeline_slots_per_merge_execution.get(); let pipelines = (0..num_pipeline_slots).map(|_| None).collect(); - let merge_execution_semaphore = Arc::new(Semaphore::new(max_concurrent_merge_executions)); + let merge_execution_semaphore = + Arc::new(Semaphore::new(max_concurrent_merge_executions.get())); + let io_throughput_limiter = max_merge_write_throughput.map(io::limiter); CompactorSupervisor { node_id, planner_client, @@ -325,6 +334,8 @@ impl Handler for CompactorSupervisor { #[cfg(test)] mod tests { + use std::num::NonZeroUsize; + use quickwit_actors::Universe; use quickwit_common::temp_dir::TempDirectory; use quickwit_proto::compaction::{ @@ -343,15 +354,18 @@ mod tests { let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); let compaction_client = CompactionPlannerServiceClient::from_mock(MockCompactionPlannerService::new()); + let compactor_config = CompactorConfig { + max_concurrent_merge_executions: NonZeroUsize::new(max_concurrent_merge_executions) + .expect("max_concurrent_merge_executions must be non-zero"), + ..CompactorConfig::for_test() + }; CompactorSupervisor::new( NodeId::from("test-node"), compaction_client, - max_concurrent_merge_executions, - None, + &compactor_config, metastore, StorageResolver::for_test(), Arc::new(IndexingSplitCache::no_caching()), - 2, EventBroker::default(), TempDirectory::for_test(), ) @@ -557,15 +571,17 @@ mod tests { let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); let client = CompactionPlannerServiceClient::from_mock(mock); // 3 merge-executions → 6 pipeline slots + let compactor_config = CompactorConfig { + max_concurrent_merge_executions: NonZeroUsize::new(3).unwrap(), + ..CompactorConfig::for_test() + }; let mut supervisor = CompactorSupervisor::new( NodeId::from("test-node"), client, - 3, - None, + &compactor_config, metastore, StorageResolver::for_test(), Arc::new(IndexingSplitCache::no_caching()), - 2, EventBroker::default(), TempDirectory::for_test(), ); diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 21997b3ec5e..ada03eb1294 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -25,7 +25,6 @@ use std::sync::Arc; pub use compactor_supervisor::CompactorSupervisor; use quickwit_actors::{Mailbox, Universe}; -use quickwit_common::io; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_config::CompactorConfig; @@ -53,16 +52,13 @@ pub async fn start_compactor_service( compaction_root_directory: TempDirectory, ) -> anyhow::Result> { info!("starting compactor service"); - let io_throughput_limiter = compactor_config.max_merge_write_throughput.map(io::limiter); let supervisor = CompactorSupervisor::new( node_id, compaction_client, - compactor_config.max_concurrent_merge_executions.get(), - io_throughput_limiter, + compactor_config, metastore, storage_resolver, split_cache, - compactor_config.max_concurrent_split_uploads, event_broker, compaction_root_directory, ); diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 996d682b6e8..7269b37ae22 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -52,7 +52,9 @@ "enable_otlp_endpoint": true, "split_store_max_num_bytes": "1T", "split_store_max_num_splits": 10000, - "max_concurrent_split_uploads": 8 + "max_concurrent_split_uploads": 8, + "max_merge_write_throughput": "100mb", + "merge_concurrency": 2 }, "ingest_api": { "replication_factor": 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 6a63ff31883..ea715dcffe0 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -43,6 +43,8 @@ enable_otlp_endpoint = true split_store_max_num_bytes = "1T" split_store_max_num_splits = 10_000 max_concurrent_split_uploads = 8 +max_merge_write_throughput = "100mb" +merge_concurrency = 2 [ingest_api] replication_factor = 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index 49e418ed924..face0852972 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -47,6 +47,8 @@ indexer: split_store_max_num_bytes: 1T split_store_max_num_splits: 10000 max_concurrent_split_uploads: 8 + max_merge_write_throughput: 100mb + merge_concurrency: 2 ingest_api: replication_factor: 2 diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index cfc7387a5d7..0d1658f1a4c 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -146,6 +146,15 @@ pub struct IndexerConfig { pub split_store_max_num_splits: usize, #[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")] pub max_concurrent_split_uploads: usize, + /// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`. + /// On hardware where IO is constrained, it makes sure that Merges (a batch operation) + /// does not starve indexing itself (as it is a latency sensitive operation). + #[serde(default)] + pub max_merge_write_throughput: Option, + /// Maximum number of merge or delete operation that can be executed concurrently. + /// (defaults to num_cpu / 2). + #[serde(default = "IndexerConfig::default_merge_concurrency")] + pub merge_concurrency: NonZeroUsize, /// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry /// Protocol (OTLP). #[serde(default = "IndexerConfig::default_enable_otlp_endpoint")] @@ -166,10 +175,6 @@ impl IndexerConfig { false } - fn default_enable_standalone_compactors() -> bool { - false - } - fn default_enable_otlp_endpoint() -> bool { #[cfg(any(test, feature = "testsuite"))] { @@ -193,10 +198,18 @@ impl IndexerConfig { 1_000 } + pub fn default_merge_concurrency() -> NonZeroUsize { + NonZeroUsize::new(quickwit_common::num_cpus() * 2 / 3).unwrap_or(NonZeroUsize::MIN) + } + fn default_cpu_capacity() -> CpuCapacity { CpuCapacity::one_cpu_thread() * (quickwit_common::num_cpus() as u32) } + fn default_enable_standalone_compactors() -> bool { + false + } + #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> anyhow::Result { use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY; @@ -207,6 +220,8 @@ impl IndexerConfig { split_store_max_num_splits: 3, max_concurrent_split_uploads: 4, cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32, + max_merge_write_throughput: None, + merge_concurrency: NonZeroUsize::new(3).unwrap(), enable_standalone_compactors: false, }; Ok(indexer_config) @@ -222,6 +237,8 @@ impl Default for IndexerConfig { split_store_max_num_splits: Self::default_split_store_max_num_splits(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), cpu_capacity: Self::default_cpu_capacity(), + merge_concurrency: Self::default_merge_concurrency(), + max_merge_write_throughput: None, enable_standalone_compactors: Self::default_enable_standalone_compactors(), } } @@ -234,6 +251,10 @@ pub struct CompactorConfig { /// a long time. Defaults to `num_cpus - 1`. #[serde(default = "CompactorConfig::default_max_concurrent_merge_executions")] pub max_concurrent_merge_executions: NonZeroUsize, + /// Number of pipelines to run per merge executions. Scalar. Since merges perform a lot + /// of IO, multiple concurrent merges can be interleaved. + #[serde(default = "CompactorConfig::default_pipeline_slots_per_merge_execution")] + pub pipeline_slots_per_merge_execution: NonZeroUsize, /// Maximum number of concurrent split uploads across all pipelines. #[serde(default = "CompactorConfig::default_max_concurrent_split_uploads")] pub max_concurrent_split_uploads: usize, @@ -248,6 +269,10 @@ impl CompactorConfig { NonZeroUsize::new(cpus).unwrap_or(NonZeroUsize::MIN) } + fn default_pipeline_slots_per_merge_execution() -> NonZeroUsize { + NonZeroUsize::new(2).unwrap() + } + fn default_max_concurrent_split_uploads() -> usize { 12 } @@ -256,6 +281,7 @@ impl CompactorConfig { pub fn for_test() -> Self { CompactorConfig { max_concurrent_merge_executions: NonZeroUsize::new(2).unwrap(), + pipeline_slots_per_merge_execution: Self::default_pipeline_slots_per_merge_execution(), max_concurrent_split_uploads: 4, max_merge_write_throughput: None, } @@ -266,6 +292,7 @@ impl Default for CompactorConfig { fn default() -> Self { Self { max_concurrent_merge_executions: Self::default_max_concurrent_merge_executions(), + pipeline_slots_per_merge_execution: Self::default_pipeline_slots_per_merge_execution(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), max_merge_write_throughput: None, } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 1f9dc30d6d3..9ce1c357f3f 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -87,9 +87,12 @@ impl FromStr for List { } fn default_enabled_services() -> ConfigValue { + // The compactor is excluded by default — it only runs in standalone mode, + // which an operator must explicitly opt into via `enable_standalone_compactors`. ConfigValue::with_default(List( QuickwitService::supported_services() .into_iter() + .filter(|service| *service != QuickwitService::Compactor) .map(|service| service.to_string()) .collect(), )) @@ -234,7 +237,7 @@ impl NodeConfigBuilder { self.indexer_config.enable_standalone_compactors = self.enable_standalone_compactors.resolve(env_vars)?; - let mut enabled_services: HashSet = self + let enabled_services: HashSet = self .enabled_services .resolve(env_vars)? .0 @@ -242,14 +245,6 @@ impl NodeConfigBuilder { .map(|service| service.parse()) .collect::>()?; - // Indexers implicitly run the compactor unless standalone compactors - // are enabled. - if enabled_services.contains(&QuickwitService::Indexer) - && !self.indexer_config.enable_standalone_compactors - { - enabled_services.insert(QuickwitService::Compactor); - } - let listen_address = self.listen_address.resolve(env_vars)?; let listen_host = listen_address.parse::()?; let listen_ip = listen_host.resolve().await?; @@ -365,6 +360,15 @@ fn validate(node_config: &NodeConfig) -> anyhow::Result<()> { if node_config.peer_seeds.is_empty() { warn!("peer seeds are empty"); } + if node_config.is_service_enabled(QuickwitService::Compactor) + && !node_config.indexer_config.enable_standalone_compactors + { + bail!( + "the `compactor` service can only be enabled when `enable_standalone_compactors` is \ + true (or `QW_ENABLE_STANDALONE_COMPACTORS=true`). With the default \ + indexer-local merge pipeline, the compactor service must not be enabled." + ); + } validate_disk_usage(node_config); Ok(()) } @@ -673,8 +677,10 @@ mod tests { split_store_max_num_bytes: ByteSize::tb(1), split_store_max_num_splits: 10_000, max_concurrent_split_uploads: 8, + merge_concurrency: NonZeroUsize::new(2).unwrap(), cpu_capacity: IndexerConfig::default_cpu_capacity(), enable_cooperative_indexing: false, + max_merge_write_throughput: Some(ByteSize::mb(100)), enable_standalone_compactors: false, } ); @@ -786,6 +792,9 @@ mod tests { assert_eq!( config.enabled_services, QuickwitService::supported_services() + .into_iter() + .filter(|service| *service != QuickwitService::Compactor) + .collect::>(), ); assert_eq!( config.rest_config.listen_addr, @@ -1378,24 +1387,6 @@ mod tests { assert!(error_message.contains("replication factor")); } - #[tokio::test] - async fn test_indexer_implicitly_enables_compactor() { - let config_yaml = r#" - version: 0.8 - enabled_services: [indexer] - "#; - let config = - load_node_config_with_env(ConfigFormat::Yaml, config_yaml.as_bytes(), &HashMap::new()) - .await - .unwrap(); - assert!(config.enabled_services.contains(&QuickwitService::Indexer)); - assert!( - config - .enabled_services - .contains(&QuickwitService::Compactor) - ); - } - #[tokio::test] async fn test_standalone_compactors_prevents_implicit_compactor() { let config_yaml = r#" diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 0acd20706cb..13560cb7e49 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -27,7 +27,7 @@ use quickwit_common::metrics::OwnedGaugeGuard; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::{KillSwitch, is_metrics_index}; -use quickwit_config::{IndexingSettings, SourceConfig}; +use quickwit_config::{IndexingSettings, SourceConfig, RetentionPolicy}; use quickwit_doc_mapper::DocMapper; use quickwit_ingest::IngesterPool; use quickwit_proto::indexing::IndexingPipelineId; @@ -37,6 +37,7 @@ use quickwit_storage::{Storage, StorageResolver}; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; +use super::MergePlanner; use crate::SplitsUpdateMailbox; use crate::actors::doc_processor::DocProcessor; use crate::actors::index_serializer::IndexSerializer; @@ -430,7 +431,7 @@ impl IndexingPipeline { let publisher = Publisher::new( PublisherType::MainPublisher, self.params.metastore.clone(), - None, + self.params.merge_planner_mailbox_opt.clone(), Some(source_mailbox.clone()), ); let (publisher_mailbox, publisher_handle) = ctx @@ -459,10 +460,10 @@ impl IndexingPipeline { UploaderType::IndexUploader, self.params.metastore.clone(), self.params.merge_policy.clone(), - None, + self.params.retention_policy.clone(), self.params.split_store.clone(), SplitsUpdateMailbox::Sequencer(sequencer_mailbox), - self.params.max_concurrent_split_uploads, + self.params.max_concurrent_split_uploads_index, self.params.event_broker.clone(), ); let (uploader_mailbox, uploader_handle) = ctx @@ -628,7 +629,7 @@ impl IndexingPipeline { self.params.metastore.clone(), self.params.storage.clone(), SplitsUpdateMailbox::Sequencer(parquet_sequencer_mailbox), - self.params.max_concurrent_split_uploads, + self.params.max_concurrent_split_uploads_index, ); let (parquet_uploader_mailbox, parquet_uploader_handle) = ctx .spawn_actor() @@ -847,11 +848,14 @@ pub struct IndexingPipelineParams { pub indexing_directory: TempDirectory, pub indexing_settings: IndexingSettings, pub split_store: IndexingSplitStore, - pub max_concurrent_split_uploads: usize, + pub max_concurrent_split_uploads_index: usize, pub cooperative_indexing_permits: Option>, // Merge-related parameters pub merge_policy: Arc, + pub retention_policy: Option, + pub merge_planner_mailbox_opt: Option>, + pub max_concurrent_split_uploads_merge: usize, // Source-related parameters pub source_config: SourceConfig, @@ -869,7 +873,8 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; - use quickwit_actors::Universe; + use quickwit_actors::{Command, Universe}; + use quickwit_common::ServiceStream; use quickwit_config::{IndexingSettings, SourceInputFormat, SourceParams}; use quickwit_doc_mapper::{DocMapper, default_doc_mapper_for_test}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; @@ -881,6 +886,7 @@ mod tests { use quickwit_proto::types::{IndexUid, NodeId, PipelineUid}; use quickwit_storage::RamStorage; + use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; use super::{IndexingPipeline, *}; use crate::merge_policy::default_merge_policy; @@ -965,6 +971,7 @@ mod tests { .returning(|_| Ok(EmptyResponse {})); let universe = Universe::new(); + let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let pipeline_params = IndexingPipelineParams { @@ -979,9 +986,12 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), + retention_policy: None, queues_dir_path: PathBuf::from("./queues"), - max_concurrent_split_uploads: 4, + max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + merge_planner_mailbox_opt: Some(merge_planner_mailbox), event_broker: EventBroker::default(), params_fingerprint: 42u64, }; @@ -1086,6 +1096,7 @@ mod tests { let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); + let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let pipeline_params = IndexingPipelineParams { pipeline_id, @@ -1100,8 +1111,11 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - max_concurrent_split_uploads: 4, + retention_policy: None, + max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + merge_planner_mailbox_opt: Some(merge_planner_mailbox), event_broker: Default::default(), params_fingerprint: 42u64, }; @@ -1125,6 +1139,111 @@ mod tests { async fn test_indexing_pipeline_simple_gz() -> anyhow::Result<()> { indexing_pipeline_simple("data/test_corpus.json.gz").await } + #[tokio::test] + async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() { + let node_id = NodeId::from("test-node"); + let pipeline_id = IndexingPipelineId { + node_id, + index_uid: IndexUid::new_with_random_ulid("test-index"), + source_id: "test-source".to_string(), + pipeline_uid: PipelineUid::for_test(0u128), + }; + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let source_config_clone = source_config.clone(); + + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_index_metadata() + .withf(|index_metadata_request| { + index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 2) + }) + .returning(move |_| { + let mut index_metadata = + IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); + index_metadata + .add_source(source_config_clone.clone()) + .unwrap(); + Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) + }); + mock_metastore + .expect_list_splits() + .returning(|_| Ok(ServiceStream::empty())); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let universe = Universe::with_accelerated_time(); + let doc_mapper = Arc::new(default_doc_mapper_for_test()); + let storage = Arc::new(RamStorage::default()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let merge_pipeline_params = MergePipelineParams { + pipeline_id: pipeline_id.merge_pipeline_id(), + doc_mapper: doc_mapper.clone(), + indexing_directory: TempDirectory::for_test(), + metastore: metastore.clone(), + split_store: split_store.clone(), + merge_policy: default_merge_policy(), + retention_policy: None, + max_concurrent_split_uploads: 2, + merge_io_throughput_limiter_opt: None, + merge_scheduler_service: universe.get_or_spawn_one(), + event_broker: Default::default(), + }; + let merge_pipeline = MergePipeline::new(merge_pipeline_params, None, universe.spawn_ctx()); + let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); + let (_merge_pipeline_mailbox, merge_pipeline_handler) = + universe.spawn_builder().spawn(merge_pipeline); + let indexing_pipeline_params = IndexingPipelineParams { + pipeline_id, + doc_mapper, + source_config, + source_storage_resolver: StorageResolver::for_test(), + indexing_directory: TempDirectory::for_test(), + indexing_settings: IndexingSettings::for_test(), + ingester_pool: IngesterPool::default(), + metastore, + queues_dir_path: PathBuf::from("./queues"), + storage, + split_store, + merge_policy: default_merge_policy(), + retention_policy: None, + max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads_merge: 5, + cooperative_indexing_permits: None, + merge_planner_mailbox_opt: Some(merge_planner_mailbox.clone()), + event_broker: Default::default(), + params_fingerprint: 42u64, + }; + let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); + let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = + universe.spawn_builder().spawn(indexing_pipeline); + let obs = indexing_pipeline_handler + .process_pending_and_observe() + .await; + assert_eq!(obs.generation, 1); + // Let's shutdown the indexer, this will trigger the indexing pipeline failure and the + // restart. + let indexer = universe.get::().into_iter().next().unwrap(); + let _ = indexer.ask(Command::Quit).await; + for _ in 0..10 { + universe.sleep(*quickwit_actors::HEARTBEAT).await; + // Check indexing pipeline has restarted. + let obs = indexing_pipeline_handler + .process_pending_and_observe() + .await; + if obs.generation == 2 { + assert_eq!(merge_pipeline_handler.check_health(true), Health::Healthy); + universe.quit().await; + return; + } + } + panic!("Pipeline was apparently not restarted."); + } async fn indexing_pipeline_all_failures_handling(test_file: &str) -> anyhow::Result<()> { let node_id = NodeId::from("test-node"); @@ -1188,6 +1307,7 @@ mod tests { let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let (merge_planner_mailbox, _) = universe.create_test_mailbox(); // Create a minimal mapper with wrong date format to ensure that all documents will fail let broken_mapper = serde_json::from_str::( r#" @@ -1219,8 +1339,11 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - max_concurrent_split_uploads: 4, + retention_policy: None, + max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, + merge_planner_mailbox_opt: Some(merge_planner_mailbox), params_fingerprint: 42u64, event_broker: Default::default(), }; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 7ae957937aa..52bd36f1e9a 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::models::DetachMergePipeline; +use quickwit_common::io; +use futures::TryStreamExt; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; @@ -37,15 +40,15 @@ use quickwit_ingest::{ }; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt, - ListIndexesMetadataResponseExt, + ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitMetadata, SplitState, }; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId, - IndexingTask, PipelineMetrics, + IndexingTask, MergePipelineId, PipelineMetrics, }; use quickwit_proto::metastore::{ IndexMetadataRequest, IndexMetadataSubrequest, IndexesMetadataRequest, - ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, ListSplitsRequest, MetastoreResult, }; use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; @@ -55,8 +58,12 @@ use tracing::{debug, error, info, warn}; use crate::models::{DetachIndexingPipeline, ObservePipeline, SpawnPipeline}; use crate::source::{AssignShards, Assignment}; +use time::OffsetDateTime; use crate::split_store::IndexingSplitCache; use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics}; +use super::merge_pipeline::{MergePipeline, MergePipelineParams}; +use super::{MergePlanner, MergeSchedulerService}; +use crate::actors::merge_pipeline::FinishPendingMergesAndShutdownPipeline; /// Name of the indexing directory, usually located at `/indexing`. pub const INDEXING_DIR_NAME: &str = "indexing"; @@ -66,10 +73,16 @@ pub struct IndexingServiceCounters { pub num_running_pipelines: usize, pub num_successful_pipelines: usize, pub num_failed_pipelines: usize, + pub num_running_merge_pipelines: usize, pub num_deleted_queues: usize, pub num_delete_queue_failures: usize, } +struct MergePipelineHandle { + mailbox: Mailbox, + handle: ActorHandle, +} + struct PipelineHandle { mailbox: Mailbox, handle: ActorHandle, @@ -93,9 +106,12 @@ pub struct IndexingService { storage_resolver: StorageResolver, indexing_pipelines: HashMap, counters: IndexingServiceCounters, + merge_scheduler_service_opt: Option>, split_cache: Arc, max_concurrent_split_uploads: usize, + merge_pipeline_handles: HashMap, cooperative_indexing_permits: Option>, + merge_io_throughput_limiter_opt: Option, event_broker: EventBroker, } @@ -120,11 +136,14 @@ impl IndexingService { cluster: Cluster, metastore: MetastoreServiceClient, ingest_api_service_opt: Option>, + merge_scheduler_service_opt: Option>, ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, split_cache: Arc, ) -> anyhow::Result { + let merge_io_throughput_limiter_opt = + indexer_config.max_merge_write_throughput.map(io::limiter); let indexing_root_directory = temp_dir::create_or_purge_directory(&data_dir_path.join(INDEXING_DIR_NAME)).await?; let queue_dir_path = data_dir_path.join(QUEUES_DIR_NAME); @@ -140,12 +159,15 @@ impl IndexingService { cluster, metastore, ingest_api_service_opt, + merge_scheduler_service_opt, ingester_pool, storage_resolver, split_cache, indexing_pipelines: Default::default(), counters: Default::default(), max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads, + merge_pipeline_handles: HashMap::new(), + merge_io_throughput_limiter_opt, cooperative_indexing_permits, event_broker, }) @@ -166,6 +188,21 @@ impl IndexingService { Ok(pipeline_handle.handle) } + async fn detach_merge_pipeline( + &mut self, + pipeline_id: &MergePipelineId, + ) -> Result, IndexingError> { + let pipeline_handle = self + .merge_pipeline_handles + .remove(pipeline_id) + .ok_or_else(|| { + let message = format!("could not find merge pipeline `{pipeline_id}`"); + IndexingError::Internal(message) + })?; + self.counters.num_running_merge_pipelines -= 1; + Ok(pipeline_handle.handle) + } + async fn observe_pipeline( &mut self, pipeline_uid: &PipelineUid, @@ -197,7 +234,14 @@ impl IndexingService { pipeline_uid, }; let index_config = index_metadata.into_index_config(); - self.spawn_pipeline_inner(ctx, pipeline_id.clone(), index_config, source_config, None) + self.spawn_pipeline_inner( + ctx, + pipeline_id.clone(), + index_config, + source_config, + None, + None, + ) .await?; Ok(pipeline_id) } @@ -208,6 +252,7 @@ impl IndexingService { indexing_pipeline_id: IndexingPipelineId, index_config: IndexConfig, source_config: SourceConfig, + immature_splits_opt: Option>, expected_params_fingerprint: Option, ) -> Result<(), IndexingError> { if self @@ -238,11 +283,42 @@ impl IndexingService { })?; let merge_policy = crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); + let retention_policy = index_config.retention_policy_opt.clone(); let split_store = IndexingSplitStore::new(storage.clone(), self.split_cache.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) .map_err(|error| IndexingError::Internal(error.to_string()))?; + let merge_planner_mailbox_opt = if let Some(merge_scheduler_service) = + self.merge_scheduler_service_opt.clone() + { + let merge_pipeline_id = indexing_pipeline_id.merge_pipeline_id(); + let merge_pipeline_params = MergePipelineParams { + pipeline_id: merge_pipeline_id, + doc_mapper: doc_mapper.clone(), + indexing_directory: indexing_directory.clone(), + metastore: self.metastore.clone(), + split_store: split_store.clone(), + merge_scheduler_service, + merge_policy: merge_policy.clone(), + retention_policy: retention_policy.clone(), + merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), + max_concurrent_split_uploads: self.max_concurrent_split_uploads, + event_broker: self.event_broker.clone(), + }; + Some(self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)) + } else { + None + }; + + let max_concurrent_split_uploads_index = if self.merge_scheduler_service_opt.is_some() { + (self.max_concurrent_split_uploads / 2).max(1) + } else { + self.max_concurrent_split_uploads + }; + let max_concurrent_split_uploads_merge = + self.max_concurrent_split_uploads - max_concurrent_split_uploads_index; + let params_fingerprint = indexing_pipeline_params_fingerprint(&index_config, &source_config); if let Some(expected_params_fingerprint) = expected_params_fingerprint { @@ -271,10 +347,13 @@ impl IndexingService { indexing_directory, indexing_settings: index_config.indexing_settings.clone(), split_store, - max_concurrent_split_uploads: self.max_concurrent_split_uploads, + max_concurrent_split_uploads_index: self.max_concurrent_split_uploads, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), - // The merge policy is needed in the uploader for determining split maturity + // Merge-related parameters merge_policy, + retention_policy, + max_concurrent_split_uploads_merge, + merge_planner_mailbox_opt, // Source-related parameters source_config, @@ -341,6 +420,70 @@ impl IndexingService { Ok(indexes_metadata) } + /// Fetches the immature splits candidates for merge for all the indexing pipelines for which a + /// merge pipeline is not running. + async fn fetch_immature_splits_for_new_merge_pipelines( + &mut self, + indexing_pipeline_ids: &[IndexingPipelineId], + ctx: &ActorContext, + ) -> MetastoreResult>> { + if self.merge_scheduler_service_opt.is_none() { + return Ok(Default::default()); + } + let mut index_uids = Vec::new(); + + for indexing_pipeline_id in indexing_pipeline_ids { + let merge_pipeline_id = indexing_pipeline_id.merge_pipeline_id(); + + if !self.merge_pipeline_handles.contains_key(&merge_pipeline_id) { + index_uids.push(merge_pipeline_id.index_uid); + } + } + if index_uids.is_empty() { + return Ok(Default::default()); + } + index_uids.sort_unstable(); + index_uids.dedup(); + + let list_splits_query = ListSplitsQuery::try_from_index_uids(index_uids) + .expect("`index_uids` should not be empty") + .with_node_id(self.node_id.clone()) + .with_split_state(SplitState::Published) + .retain_immature(OffsetDateTime::now_utc()); + let list_splits_request = + ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?; + + let mut immature_splits_stream = ctx + .protect_future(self.metastore.list_splits(list_splits_request)) + .await?; + + let mut per_merge_pipeline_immature_splits: HashMap> = + indexing_pipeline_ids + .iter() + .map(|indexing_pipeline_id| (indexing_pipeline_id.merge_pipeline_id(), Vec::new())) + .collect(); + + let mut num_immature_splits = 0usize; + + while let Some(list_splits_response) = immature_splits_stream.try_next().await? { + for split_metadata in list_splits_response.deserialize_splits_metadata().await? { + num_immature_splits += 1; + + let merge_pipeline_id = MergePipelineId { + node_id: self.node_id.clone(), + index_uid: split_metadata.index_uid.clone(), + source_id: split_metadata.source_id.clone(), + }; + per_merge_pipeline_immature_splits + .entry(merge_pipeline_id) + .or_default() + .push(split_metadata); + } + } + info!("fetched {num_immature_splits} splits candidates for merge"); + Ok(per_merge_pipeline_immature_splits) + } + async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> { self.indexing_pipelines .retain(|pipeline_uid, pipeline_handle| { @@ -368,6 +511,48 @@ impl IndexingService { } } }); + let merge_pipelines_to_retain: HashSet = self + .indexing_pipelines + .values() + .map(|pipeline_handle| pipeline_handle.indexing_pipeline_id.merge_pipeline_id()) + .collect(); + + let merge_pipelines_to_shutdown: Vec = self + .merge_pipeline_handles + .keys() + .filter(|running_merge_pipeline_id| { + !merge_pipelines_to_retain.contains(running_merge_pipeline_id) + }) + .cloned() + .collect(); + + for merge_pipeline_to_shutdown in merge_pipelines_to_shutdown { + if let Some((_, merge_pipeline_handle)) = self + .merge_pipeline_handles + .remove_entry(&merge_pipeline_to_shutdown) + { + // We gracefully shutdown the merge pipeline, so we can complete the in-flight + // merges. + info!( + index_uid=%merge_pipeline_to_shutdown.index_uid, + source_id=%merge_pipeline_to_shutdown.source_id, + "shutting down orphan merge pipeline" + ); + // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` + // should not block. + // We avoid using `.quit()` here because it waits for the actor to exit. + merge_pipeline_handle + .handle + .mailbox() + .send_message(FinishPendingMergesAndShutdownPipeline) + .await + .expect("merge pipeline mailbox should not be full"); + } + } + // Finally, we remove the completed or failed merge pipelines. + self.merge_pipeline_handles + .retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running()); + self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); self.update_chitchat_running_plan().await; let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self @@ -385,6 +570,33 @@ impl IndexingService { Ok(()) } + fn get_or_create_merge_pipeline( + &mut self, + merge_pipeline_params: MergePipelineParams, + immature_splits_opt: Option>, + ctx: &ActorContext, + ) -> Mailbox { + if let Some(merge_pipeline_handle) = self + .merge_pipeline_handles + .get(&merge_pipeline_params.pipeline_id) + { + return merge_pipeline_handle.mailbox.clone(); + } + let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone(); + let merge_pipeline = + MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx()); + let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); + let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline); + let merge_pipeline_handle = MergePipelineHandle { + mailbox: merge_planner_mailbox.clone(), + handle: pipeline_handle, + }; + self.merge_pipeline_handles + .insert(merge_pipeline_id, merge_pipeline_handle); + self.counters.num_running_merge_pipelines += 1; + merge_planner_mailbox + } + /// For all Ingest V2 pipelines, assigns the set of shards they should be working on. /// This is done regardless of whether there has been a change in their shard list /// or not. @@ -491,6 +703,10 @@ impl IndexingService { .map(|index_metadata| (index_metadata.index_uid.clone(), index_metadata)) .collect(); + let mut per_merge_pipeline_immature_splits: HashMap> = + self.fetch_immature_splits_for_new_merge_pipelines(&pipelines_to_spawn_ids, ctx) + .await?; + let mut spawn_pipeline_failures: Vec = Vec::new(); for (task_to_spawn, id_to_spawn) in pipelines_to_spawn.iter().zip(pipelines_to_spawn_ids) { @@ -498,12 +714,17 @@ impl IndexingService { per_index_uid_indexes_metadata.get(task_to_spawn.index_uid()) { if let Some(source_config) = index_metadata.sources.get(&task_to_spawn.source_id) { + let merge_pipeline_id = id_to_spawn.merge_pipeline_id(); + let immature_splits_opt = + per_merge_pipeline_immature_splits.remove(&merge_pipeline_id); + if let Err(error) = self .spawn_pipeline_inner( ctx, id_to_spawn.clone(), index_metadata.index_config.clone(), source_config.clone(), + immature_splits_opt, Some(task_to_spawn.params_fingerprint), ) .await @@ -682,6 +903,19 @@ impl Handler for IndexingService { } } +#[async_trait] +impl Handler for IndexingService { + type Reply = Result, IndexingError>; + + async fn handle( + &mut self, + msg: DetachMergePipeline, + _ctx: &ActorContext, + ) -> Result { + Ok(self.detach_merge_pipeline(&msg.pipeline_id).await) + } +} + #[derive(Debug)] struct SuperviseLoop; @@ -785,16 +1019,17 @@ mod tests { }; use quickwit_ingest::{CreateQueueIfNotExistsRequest, init_ingest_api}; use quickwit_metastore::{ - AddSourceRequestExt, CreateIndexRequestExt, ListIndexesMetadataResponseExt, + AddSourceRequestExt, CreateIndexRequestExt, ListIndexesMetadataResponseExt, Split, metastore_for_test, }; use quickwit_proto::indexing::IndexingTask; use quickwit_proto::metastore::{ AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, IndexMetadataResponse, - IndexesMetadataResponse, ListIndexesMetadataResponse, MockMetastoreService, - }; + IndexesMetadataResponse, ListIndexesMetadataResponse, ListSplitsResponse, + MockMetastoreService, }; use super::*; + use crate::actors::merge_pipeline::SUPERVISE_LOOP_INTERVAL; async fn spawn_indexing_service_for_test( data_dir_path: &Path, @@ -810,6 +1045,7 @@ mod tests { init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default()) .await .unwrap(); + let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( NodeId::from("test-node"), data_dir_path.to_path_buf(), @@ -818,6 +1054,7 @@ mod tests { cluster, metastore, Some(ingest_api_service), + Some(merge_scheduler_mailbox), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), @@ -918,8 +1155,15 @@ mod tests { .await .unwrap(); pipeline_handle.kill().await; + let _merge_pipeline = indexing_service + .ask_for_res(DetachMergePipeline { + pipeline_id: pipeline_id.merge_pipeline_id(), + }) + .await + .unwrap(); let observation = indexing_service_handle.process_pending_and_observe().await; assert_eq!(observation.num_running_pipelines, 0); + assert_eq!(observation.num_running_merge_pipelines, 0); universe.assert_quit().await; } @@ -1270,6 +1514,266 @@ mod tests { universe.assert_quit().await; } + #[tokio::test] + async fn test_indexing_service_shutdown_merge_pipeline_when_no_indexing_pipeline() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let metastore = metastore_for_test(); + + let index_id = append_random_suffix("test-indexing-service"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let source_config = SourceConfig { + source_id: "test-indexing-service--source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let add_source_request = + AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); + metastore.add_source(add_source_request).await.unwrap(); + + // Test `IndexingService::new`. + let temp_dir = tempfile::tempdir().unwrap(); + let data_dir_path = temp_dir.path().to_path_buf(); + let indexer_config = IndexerConfig::for_test().unwrap(); + let num_blocking_threads = 1; + let storage_resolver = StorageResolver::unconfigured(); + let universe = Universe::with_accelerated_time(); + let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); + let ingest_api_service = + init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) + .await + .unwrap(); + let merge_scheduler_service = universe.get_or_spawn_one(); + let indexing_server = IndexingService::new( + NodeId::from("test-node"), + data_dir_path, + indexer_config, + num_blocking_threads, + cluster.clone(), + metastore.clone(), + Some(ingest_api_service), + Some(merge_scheduler_service), + IngesterPool::default(), + storage_resolver.clone(), + EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), + ) + .await + .unwrap(); + let (indexing_server_mailbox, indexing_server_handle) = + universe.spawn_builder().spawn(indexing_server); + let pipeline_id = indexing_server_mailbox + .ask_for_res(SpawnPipeline { + index_id: index_id.clone(), + source_config, + pipeline_uid: PipelineUid::default(), + }) + .await + .unwrap(); + let observation = indexing_server_handle.observe().await; + assert_eq!(observation.num_running_pipelines, 1); + assert_eq!(observation.num_failed_pipelines, 0); + assert_eq!(observation.num_successful_pipelines, 0); + assert_eq!(observation.num_running_merge_pipelines, 1); + + // Test `shutdown_pipeline` + let pipeline = indexing_server_mailbox + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await + .unwrap(); + pipeline.quit().await; + + // Let the service cleanup the merge pipelines. + universe.sleep(*HEARTBEAT).await; + + let observation = indexing_server_handle.process_pending_and_observe().await; + assert_eq!(observation.num_running_pipelines, 0); + assert_eq!(observation.num_running_merge_pipelines, 0); + universe.sleep(SUPERVISE_LOOP_INTERVAL).await; + // Check that the merge pipeline is also shut down as they are no more indexing pipeilne on + // the index. + assert!(universe.get_one::().is_none()); + // It may or may not panic + universe.quit().await; + } + + #[tokio::test] + async fn test_indexing_service_no_merge_pipeline_when_no_merge_scheduler() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let metastore = metastore_for_test(); + + let index_id = append_random_suffix("test-indexing-service-no-merge"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let add_source_request = + AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); + metastore.add_source(add_source_request).await.unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let data_dir_path = temp_dir.path().to_path_buf(); + let indexer_config = IndexerConfig::for_test().unwrap(); + let num_blocking_threads = 1; + let storage_resolver = StorageResolver::unconfigured(); + let universe = Universe::with_accelerated_time(); + let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); + let ingest_api_service = + init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) + .await + .unwrap(); + let indexing_server = IndexingService::new( + NodeId::from("test-node"), + data_dir_path, + indexer_config, + num_blocking_threads, + cluster.clone(), + metastore.clone(), + Some(ingest_api_service), + None, // No merge scheduler — external merge service handles compaction. + IngesterPool::default(), + storage_resolver.clone(), + EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), + ) + .await + .unwrap(); + let (indexing_server_mailbox, indexing_server_handle) = + universe.spawn_builder().spawn(indexing_server); + + indexing_server_mailbox + .ask_for_res(SpawnPipeline { + index_id: index_id.clone(), + source_config, + pipeline_uid: PipelineUid::default(), + }) + .await + .unwrap(); + + let observation = indexing_server_handle.observe().await; + assert_eq!(observation.num_running_pipelines, 1); + assert_eq!(observation.num_running_merge_pipelines, 0); + assert!(universe.get_one::().is_none()); + + universe.quit().await; + } + + #[tokio::test] + async fn test_indexing_service_spawns_merge_pipeline_with_merge_scheduler() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let metastore = metastore_for_test(); + + let index_id = append_random_suffix("test-indexing-service-with-merge"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let add_source_request = + AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); + metastore.add_source(add_source_request).await.unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let data_dir_path = temp_dir.path().to_path_buf(); + let indexer_config = IndexerConfig::for_test().unwrap(); + let num_blocking_threads = 1; + let storage_resolver = StorageResolver::unconfigured(); + let universe = Universe::with_accelerated_time(); + let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); + let ingest_api_service = + init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) + .await + .unwrap(); + let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); + let indexing_server = IndexingService::new( + NodeId::from("test-node"), + data_dir_path, + indexer_config, + num_blocking_threads, + cluster.clone(), + metastore.clone(), + Some(ingest_api_service), + Some(merge_scheduler_mailbox), + IngesterPool::default(), + storage_resolver.clone(), + EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), + ) + .await + .unwrap(); + let (indexing_server_mailbox, indexing_server_handle) = + universe.spawn_builder().spawn(indexing_server); + + indexing_server_mailbox + .ask_for_res(SpawnPipeline { + index_id: index_id.clone(), + source_config, + pipeline_uid: PipelineUid::default(), + }) + .await + .unwrap(); + + let observation = indexing_server_handle.observe().await; + assert_eq!(observation.num_running_pipelines, 1); + assert_eq!(observation.num_running_merge_pipelines, 1); + assert!(universe.get_one::().is_some()); + + universe.quit().await; + } + #[derive(Debug)] struct FreezePipeline; #[async_trait] @@ -1374,6 +1878,7 @@ mod tests { let observation = indexing_service_handle.observe().await; assert_eq!(observation.num_running_pipelines, 1); assert_eq!(observation.num_failed_pipelines, 0); + assert_eq!(observation.num_running_merge_pipelines, 1); // Might generate panics universe.quit().await; } @@ -1418,6 +1923,7 @@ mod tests { let indexer_config = IndexerConfig::for_test().unwrap(); let num_blocking_threads = 1; let storage_resolver = StorageResolver::unconfigured(); + let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); let mut indexing_server = IndexingService::new( NodeId::from("test-ingest-api-gc-node"), data_dir_path, @@ -1426,6 +1932,7 @@ mod tests { cluster.clone(), metastore.clone(), Some(ingest_api_service.clone()), + Some(merge_scheduler_service), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), @@ -1495,6 +2002,31 @@ mod tests { Ok(response) }); + mock_metastore + .expect_list_splits() + .withf(|request| { + let list_splits_query = request.deserialize_list_splits_query().unwrap(); + list_splits_query.index_uids.unwrap() == [("test-index-0", 0)] + }) + .return_once(|_request| Ok(ServiceStream::empty())); + mock_metastore + .expect_list_splits() + .withf(|request| { + let list_splits_query = request.deserialize_list_splits_query().unwrap(); + list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)] + }) + .return_once(|_request| { + let splits = vec![Split { + split_metadata: SplitMetadata::for_test("test-split".to_string()), + split_state: SplitState::Published, + update_timestamp: 0, + publish_timestamp: Some(0), + }]; + let list_splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + let response = ServiceStream::from(vec![Ok(list_splits_response)]); + Ok(response) + }); + let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) .await diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 379f038484f..cff5452b89f 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -26,6 +26,7 @@ use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_storage::StorageResolver; use tracing::info; +use crate::actors::MergeSchedulerService; pub use crate::actors::{ FinishPendingMergesAndShutdownPipeline, IndexingError, IndexingPipeline, IndexingPipelineParams, IndexingService, PublisherType, Sequencer, SplitsUpdateMailbox, @@ -73,6 +74,7 @@ pub async fn start_indexing_service( ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, + merge_scheduler_mailbox: Option>, indexing_split_cache: Arc, ) -> anyhow::Result> { info!("starting indexer service"); @@ -85,6 +87,7 @@ pub async fn start_indexing_service( cluster, metastore.clone(), ingest_api_service_mailbox, + merge_scheduler_mailbox, ingester_pool, storage_resolver, event_broker, diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs index 868f8029be7..67548b9c09d 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs @@ -13,7 +13,7 @@ // limitations under the License. use quickwit_config::SourceConfig; -use quickwit_proto::indexing::IndexingPipelineId; +use quickwit_proto::indexing::{IndexingPipelineId, MergePipelineId}; use quickwit_proto::types::{IndexId, PipelineUid}; #[derive(Clone, Debug)] @@ -31,6 +31,14 @@ pub struct DetachIndexingPipeline { pub pipeline_id: IndexingPipelineId, } +/// Detaches a merge pipeline from the indexing service. The pipeline is no longer managed by the +/// server. This is mostly useful for preventing the server killing an existing merge pipeline +/// if a indexing pipeline is detached. +#[derive(Debug)] +pub struct DetachMergePipeline { + pub pipeline_id: MergePipelineId, +} + #[derive(Debug)] pub struct ObservePipeline { pub pipeline_id: IndexingPipelineId, diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 7f84bbab0af..fd3188e2104 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -34,7 +34,9 @@ pub use indexed_split::{ CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder, IndexedSplitBuilder, }; -pub use indexing_service_message::{DetachIndexingPipeline, ObservePipeline, SpawnPipeline}; +pub use indexing_service_message::{ + DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline, +}; pub use indexing_statistics::IndexingStatistics; pub use merge_planner_message::NewSplits; pub use merge_scratch::MergeScratch; diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 62634d136cc..e6c824ce788 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -112,6 +112,7 @@ impl TestSandbox { let num_blocking_threads = 1; let storage = storage_resolver.resolve(&index_uri).await?; let universe = Universe::with_accelerated_time(); + let merge_scheduler_mailbox = universe.get_or_spawn_one(); let queues_dir_path = temp_dir.path().join(QUEUES_DIR_NAME); let ingest_api_service = init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()).await?; @@ -123,6 +124,7 @@ impl TestSandbox { cluster, metastore.clone(), Some(ingest_api_service), + Some(merge_scheduler_mailbox), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), diff --git a/quickwit/quickwit-janitor/src/janitor_service.rs b/quickwit/quickwit-janitor/src/janitor_service.rs index 712458ad805..1e843f57b9b 100644 --- a/quickwit/quickwit-janitor/src/janitor_service.rs +++ b/quickwit/quickwit-janitor/src/janitor_service.rs @@ -25,7 +25,7 @@ pub struct JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, - compaction_planner_handle: ActorHandle, + compaction_planner_handle: Option>, } impl JanitorService { @@ -33,7 +33,7 @@ impl JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, - compaction_planner_handle: ActorHandle, + compaction_planner_handle: Option>, ) -> Self { Self { delete_task_service_handle, @@ -50,10 +50,16 @@ impl JanitorService { } else { true }; + let compaction_planner_is_not_failure: bool = + if let Some(compaction_planner_handle) = &self.compaction_planner_handle { + compaction_planner_handle.state() != ActorState::Failure + } else { + true + }; delete_task_is_not_failure && self.garbage_collector_handle.state() != ActorState::Failure && self.retention_policy_executor_handle.state() != ActorState::Failure - && self.compaction_planner_handle.state() != ActorState::Failure + && compaction_planner_is_not_failure } } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index 8bac8c46c21..7e6734d9016 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -49,7 +49,7 @@ pub async fn start_janitor_service( storage_resolver: StorageResolver, event_broker: EventBroker, run_delete_task_service: bool, - compaction_planner_handle: ActorHandle, + compaction_planner_handle: Option>, ) -> anyhow::Result> { info!("starting janitor service"); let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone()); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 1fe1b69069c..ede9d3aaf0c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -79,7 +79,7 @@ use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig}; use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; -use quickwit_indexing::actors::IndexingService; +use quickwit_indexing::actors::{IndexingService, MergeSchedulerService}; use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::{IndexingSplitCache, start_indexing_service}; use quickwit_ingest::{ @@ -271,7 +271,8 @@ async fn balance_channel_for_service( BalanceChannel::from_stream(service_change_stream) } -/// Builds a `CompactionPlannerServiceClient` if the node runs the janitor or compactor. +/// Builds a `CompactionPlannerServiceClient` if standalone compactors are enabled +/// and the node runs the janitor or compactor. /// /// On janitor nodes, spawns a `CompactionPlanner` actor and builds the client from /// its mailbox. On compactor-only nodes, connects to a remote janitor via gRPC. @@ -287,6 +288,9 @@ async fn get_compaction_planner_client_if_needed( Option, Option>, )> { + if !node_config.indexer_config.enable_standalone_compactors { + return Ok((None, None)); + } let is_janitor = node_config.is_service_enabled(QuickwitService::Janitor); let is_compactor = node_config.is_service_enabled(QuickwitService::Compactor); if !is_janitor && !is_compactor { @@ -322,6 +326,16 @@ async fn get_compaction_planner_client_if_needed( )) } +fn spawn_merge_scheduler_service( + universe: &Universe, + node_config: &NodeConfig, +) -> Mailbox { + let (mailbox, _) = universe.spawn_builder().spawn(MergeSchedulerService::new( + node_config.indexer_config.merge_concurrency.get(), + )); + mailbox +} + async fn start_ingest_client_if_needed( node_config: &NodeConfig, universe: &Universe, @@ -610,9 +624,7 @@ pub async fn serve_quickwit( .await .context("failed to initialize compaction service client")?; - // Build the indexing split cache once and share it between the indexing - // service and the compactor supervisor (when both run on the same node). - // A zero quota in `IndexerConfig` produces a no-op cache. + // In the case where a compactor and indexer run on the same node (in a single node deployment, for example), they should share the split cache so that downloads/new splits end up in predictable locations. If there's only one service enabled, no harm, no foul. let indexing_split_cache_opt: Option> = if node_config .is_service_enabled(QuickwitService::Indexer) || node_config.is_service_enabled(QuickwitService::Compactor) @@ -628,6 +640,15 @@ pub async fn serve_quickwit( }; let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { + // if standalone compactors is enabled, indexing pipelines don't perform any merges. + // if standalone compactors is disabled, indexing pipelines perform all merges as before. + let merge_scheduler_mailbox_opt = + if !node_config.indexer_config.enable_standalone_compactors { + Some(spawn_merge_scheduler_service(&universe, &node_config)) + } else { + None + }; + let split_cache = indexing_split_cache_opt .clone() .expect("indexing split cache must exist on indexer nodes"); @@ -640,6 +661,7 @@ pub async fn serve_quickwit( ingester_pool.clone(), storage_resolver.clone(), event_broker.clone(), + merge_scheduler_mailbox_opt, split_cache, ) .await @@ -712,6 +734,7 @@ pub async fn serve_quickwit( } } + // this is the search SplitCache, not to be confused with the IndexingSplitCache. let split_cache_opt: Option> = if let Some(split_cache_limits) = node_config.searcher_config.split_cache { let split_cache = SplitCache::with_root_path( @@ -790,8 +813,6 @@ pub async fn serve_quickwit( }; let janitor_service_opt = if node_config.is_service_enabled(QuickwitService::Janitor) { - let compaction_planner_handle = compaction_planner_handle_opt - .expect("compaction planner handle must exist on janitor nodes"); let janitor_service = start_janitor_service( &universe, &node_config, @@ -800,7 +821,7 @@ pub async fn serve_quickwit( storage_resolver.clone(), event_broker.clone(), !get_bool_from_env(DISABLE_DELETE_TASK_SERVICE_ENV_KEY, false), - compaction_planner_handle, + compaction_planner_handle_opt, ) .await .context("failed to start janitor service")?; @@ -809,7 +830,9 @@ pub async fn serve_quickwit( None }; - let compactor_supervisor_opt = if node_config.is_service_enabled(QuickwitService::Compactor) { + let compactor_supervisor_opt = if node_config.is_service_enabled(QuickwitService::Compactor) + && node_config.indexer_config.enable_standalone_compactors + { let compaction_dir = node_config.data_dir_path.join("compaction"); fs::create_dir_all(&compaction_dir)?; let compaction_root_directory = quickwit_common::temp_dir::Builder::default() @@ -1948,8 +1971,9 @@ mod tests { let universe = Universe::new(); let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); - // Janitor without compactor: planner client is returned (for gRPC registration). + // Janitor + indexer with standalone compactors enabled: planner client is returned. let mut node_config = NodeConfig::for_test(); + node_config.indexer_config.enable_standalone_compactors = true; node_config.enabled_services = HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]); let (client_opt, handle_opt) = @@ -1981,6 +2005,20 @@ mod tests { assert!(client_opt.is_none()); assert!(handle_opt.is_none()); + // Standalone compactors disabled: short-circuit returns (None, None) regardless of + // which services are enabled. + node_config.indexer_config.enable_standalone_compactors = false; + node_config.enabled_services = HashSet::from([ + QuickwitService::Janitor, + QuickwitService::Indexer, + ]); + let (client_opt, handle_opt) = + get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) + .await + .unwrap(); + assert!(client_opt.is_none()); + assert!(handle_opt.is_none()); + universe.assert_quit().await; } @@ -1994,6 +2032,7 @@ mod tests { let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); let mut node_config = NodeConfig::for_test(); + node_config.indexer_config.enable_standalone_compactors = true; node_config.enabled_services = HashSet::from([QuickwitService::Indexer, QuickwitService::Compactor]); let result =