Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ mod tests {
use quickwit_cli::split::{DescribeSplitArgs, SplitCliCommand};
use quickwit_cli::tool::{
ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs,
ToolCliCommand,
ToolCliCommand, MergeArgs
};
use quickwit_common::uri::Uri;
use quickwit_config::SourceInputFormat;
Expand Down Expand Up @@ -740,6 +740,31 @@ mod tests {
Ok(())
}

#[test]
fn test_parse_merge_args() -> anyhow::Result<()> {
let app = build_cli().no_binary_name(true);
let matches = app.try_get_matches_from([
"tool",
"merge",
"--index",
"wikipedia",
"--source",
"ingest-source",
"--config",
"/config.yaml",
])?;
let command = CliCommand::parse_cli_args(matches)?;
assert!(matches!(
command,
CliCommand::Tool(ToolCliCommand::Merge(MergeArgs {
index_id,
source_id,
..
})) if &index_id == "wikipedia" && source_id == "ingest-source"
));
Ok(())
}

#[test]
fn test_parse_no_color() {
// SAFETY: this test may not be entirely sound if not run with nextest or --test-threads=1
Expand Down
137 changes: 137 additions & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::info;
use quickwit_config::VecSourceParams;
use quickwit_actors::Mailbox;
use quickwit_actors::ActorExitStatus;
use quickwit_indexing::actors::MergePipeline;
use quickwit_indexing::actors::MergeSchedulerService;
use quickwit_indexing::models::DetachMergePipeline;
use quickwit_proto::types::SourceId;
use std::collections::{HashSet, VecDeque};
use std::io::{IsTerminal, Stdout, Write, stdout};
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -200,6 +208,13 @@ pub struct GarbageCollectIndexArgs {
pub dry_run: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct MergeArgs {
pub config_uri: Uri,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ExtractSplitArgs {
pub config_uri: Uri,
Expand All @@ -214,6 +229,7 @@ pub enum ToolCliCommand {
LocalIngest(LocalIngestDocsArgs),
LocalSearch(LocalSearchArgs),
ExtractSplit(ExtractSplitArgs),
Merge(MergeArgs),
}

impl ToolCliCommand {
Expand All @@ -225,6 +241,7 @@ impl ToolCliCommand {
"gc" => Self::parse_garbage_collect_args(submatches),
"local-ingest" => Self::parse_local_ingest_args(submatches),
"local-search" => Self::parse_local_search_args(submatches),
"merge" => Self::parse_merge_args(submatches),
"extract-split" => Self::parse_extract_split_args(submatches),
_ => bail!("unknown tool subcommand `{subcommand}`"),
}
Expand Down Expand Up @@ -314,6 +331,24 @@ impl ToolCliCommand {
}))
}

fn parse_merge_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
.remove_one::<String>("config")
.map(|uri_str| Uri::from_str(&uri_str))
.expect("`config` should be a required arg.")?;
let index_id = matches
.remove_one::<String>("index")
.expect("'index-id' should be a required arg.");
let source_id = matches
.remove_one::<String>("source")
.expect("'source-id' should be a required arg.");
Ok(Self::Merge(MergeArgs {
index_id,
source_id,
config_uri,
}))
}

fn parse_garbage_collect_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
.get_one("config")
Expand Down Expand Up @@ -363,6 +398,7 @@ impl ToolCliCommand {
Self::GarbageCollect(args) => garbage_collect_index_cli(args).await,
Self::LocalIngest(args) => local_ingest_docs_cli(args).await,
Self::LocalSearch(args) => local_search_cli(args).await,
Self::Merge(args) => merge_cli(args).await,
Self::ExtractSplit(args) => extract_split_cli(args).await,
}
}
Expand Down Expand Up @@ -418,6 +454,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let split_cache =
Arc::new(IndexingSplitCache::from_config(&indexer_config, &config.data_dir_path).await?);
let indexing_server = IndexingService::new(
Expand All @@ -428,6 +465,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
Some(merge_scheduler_service_mailbox),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Expand All @@ -443,6 +481,11 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
pipeline_uid: PipelineUid::random(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;
let indexing_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
Expand All @@ -460,6 +503,13 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let statistics =
start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none())
.await?;

merge_pipeline_handle
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;
merge_pipeline_handle.join().await;

// Shutdown the indexing server.
universe
.send_exit_with_success(&indexing_server_mailbox)
Expand Down Expand Up @@ -527,6 +577,93 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
Ok(())
}

pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
debug!(args=?args, "run-merge-operations");
println!("❯ Merging splits locally...");
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?;
// The indexing service needs to update its cluster chitchat state so that the control plane is
// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
// and avoid impacting potential control plane running on the cluster.
let cluster = create_empty_cluster(&config).await?;
let runtimes_config = RuntimesConfig::default();
start_actor_runtimes(
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let indexer_config = IndexerConfig::default();
let universe = Universe::new();
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id,
config.data_dir_path,
indexer_config,
runtimes_config.num_threads_blocking,
cluster,
metastore,
None,
Some(merge_scheduler_service),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Arc::new(IndexingSplitCache::no_caching()),
)
.await?;
let (indexing_service_mailbox, indexing_service_handle) =
universe.spawn_builder().spawn(indexing_server);
let pipeline_id = indexing_service_mailbox
.ask_for_res(SpawnPipeline {
index_id: args.index_id,
source_config: SourceConfig {
source_id: args.source_id,
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::Vec(VecSourceParams::default()),
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::random(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;

let mut check_interval = tokio::time::interval(Duration::from_secs(1));
loop {
check_interval.tick().await;

pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("merge pipeline has no more ongoing merges, exiting");
break;
}

if pipeline_handle.state().is_exit() {
info!("merge pipeline has exited, exiting");
break;
}
}

let (pipeline_exit_status, _pipeline_statistics) = pipeline_handle.quit().await;
indexing_service_handle.quit().await;
if !matches!(
pipeline_exit_status,
ActorExitStatus::Success | ActorExitStatus::Quit
) {
bail!(pipeline_exit_status);
}
println!("{} Merge successful.", "✔".color(GREEN_COLOR));
Ok(())
}

pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> {
debug!(args=?args, "garbage-collect-index");
println!("❯ Garbage collecting index...");
Expand Down
42 changes: 29 additions & 13 deletions quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use std::time::Duration;

use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext};
use quickwit_common::io::Limiter;
use quickwit_common::io::{self, Limiter};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::uri::Uri;
use quickwit_config::{IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper};
use quickwit_config::{
CompactorConfig, IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper,
};
use quickwit_doc_mapper::DocMapping;
use quickwit_indexing::merge_policy::{MergeOperation, merge_policy_from_settings};
use quickwit_indexing::{IndexingSplitCache, IndexingSplitStore};
Expand Down Expand Up @@ -75,18 +77,25 @@ impl CompactorSupervisor {
pub fn new(
node_id: NodeId,
planner_client: CompactionPlannerServiceClient,
max_concurrent_merge_executions: usize,
io_throughput_limiter: Option<Limiter>,
compactor_config: &CompactorConfig,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
split_cache: Arc<IndexingSplitCache>,
max_concurrent_split_uploads: usize,
event_broker: EventBroker,
compaction_root_directory: TempDirectory,
) -> Self {
let num_pipeline_slots = max_concurrent_merge_executions * 2;
let &CompactorConfig {
max_concurrent_merge_executions,
pipeline_slots_per_merge_execution,
max_concurrent_split_uploads,
max_merge_write_throughput,
} = compactor_config;
let num_pipeline_slots =
max_concurrent_merge_executions.get() * pipeline_slots_per_merge_execution.get();
let pipelines = (0..num_pipeline_slots).map(|_| None).collect();
let merge_execution_semaphore = Arc::new(Semaphore::new(max_concurrent_merge_executions));
let merge_execution_semaphore =
Arc::new(Semaphore::new(max_concurrent_merge_executions.get()));
let io_throughput_limiter = max_merge_write_throughput.map(io::limiter);
CompactorSupervisor {
node_id,
planner_client,
Expand Down Expand Up @@ -325,6 +334,8 @@ impl Handler<CheckPipelineStatuses> for CompactorSupervisor {

#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

use quickwit_actors::Universe;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_proto::compaction::{
Expand All @@ -343,15 +354,18 @@ mod tests {
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());
let compaction_client =
CompactionPlannerServiceClient::from_mock(MockCompactionPlannerService::new());
let compactor_config = CompactorConfig {
max_concurrent_merge_executions: NonZeroUsize::new(max_concurrent_merge_executions)
.expect("max_concurrent_merge_executions must be non-zero"),
..CompactorConfig::for_test()
};
CompactorSupervisor::new(
NodeId::from("test-node"),
compaction_client,
max_concurrent_merge_executions,
None,
&compactor_config,
metastore,
StorageResolver::for_test(),
Arc::new(IndexingSplitCache::no_caching()),
2,
EventBroker::default(),
TempDirectory::for_test(),
)
Expand Down Expand Up @@ -557,15 +571,17 @@ mod tests {
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());
let client = CompactionPlannerServiceClient::from_mock(mock);
// 3 merge-executions → 6 pipeline slots
let compactor_config = CompactorConfig {
max_concurrent_merge_executions: NonZeroUsize::new(3).unwrap(),
..CompactorConfig::for_test()
};
let mut supervisor = CompactorSupervisor::new(
NodeId::from("test-node"),
client,
3,
None,
&compactor_config,
metastore,
StorageResolver::for_test(),
Arc::new(IndexingSplitCache::no_caching()),
2,
EventBroker::default(),
TempDirectory::for_test(),
);
Expand Down
6 changes: 1 addition & 5 deletions quickwit/quickwit-compaction/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::sync::Arc;

pub use compactor_supervisor::CompactorSupervisor;
use quickwit_actors::{Mailbox, Universe};
use quickwit_common::io;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_config::CompactorConfig;
Expand Down Expand Up @@ -53,16 +52,13 @@ pub async fn start_compactor_service(
compaction_root_directory: TempDirectory,
) -> anyhow::Result<Mailbox<CompactorSupervisor>> {
info!("starting compactor service");
let io_throughput_limiter = compactor_config.max_merge_write_throughput.map(io::limiter);
let supervisor = CompactorSupervisor::new(
node_id,
compaction_client,
compactor_config.max_concurrent_merge_executions.get(),
io_throughput_limiter,
compactor_config,
metastore,
storage_resolver,
split_cache,
compactor_config.max_concurrent_split_uploads,
event_broker,
compaction_root_directory,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
"enable_otlp_endpoint": true,
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8
"max_concurrent_split_uploads": 8,
"max_merge_write_throughput": "100mb",
"merge_concurrency": 2
},
"ingest_api": {
"replication_factor": 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ enable_otlp_endpoint = true
split_store_max_num_bytes = "1T"
split_store_max_num_splits = 10_000
max_concurrent_split_uploads = 8
max_merge_write_throughput = "100mb"
merge_concurrency = 2

[ingest_api]
replication_factor = 2
Expand Down
Loading
Loading