From 1f6512ea3049765c0c1ff2c47903b07be66fb286 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 28 Apr 2026 18:12:50 -0400 Subject: [PATCH] feat: add ParquetMergeExecutor and full downloader implementation (Phase 3c) Phase 3 pipeline integration, third PR: - ParquetMergeSplitDownloader: downloads each input split's Parquet file from object storage to a local temp directory, forwards ParquetMergeScratch to the executor. Replaces the stub from PR 3b. - ParquetMergeExecutor: runs merge_sorted_parquet_files via run_cpu_intensive, builds output ParquetSplitMetadata via merge_parquet_split_metadata, renames output files to match generated split IDs, sends ParquetSplitBatch with replaced_split_ids to the uploader. - ParquetSplitBatch.checkpoint_delta -> checkpoint_delta_opt: now Option to support merge operations (no checkpoint delta for data reorganization). Ingest path passes Some(delta), merge path passes None. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/actors/metrics_pipeline/mod.rs | 2 + .../metrics_pipeline/parquet_indexer.rs | 7 +- .../parquet_merge_executor.rs | 216 ++++++++++++++++++ .../parquet_merge_split_downloader.rs | 121 ++++++++-- .../metrics_pipeline/parquet_packager.rs | 9 +- .../metrics_pipeline/parquet_uploader.rs | 24 +- 6 files changed, 349 insertions(+), 30 deletions(-) create mode 100644 quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index ec73c03b1d2..f11356a72cb 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -25,6 +25,7 @@ mod indexing_service_impl; mod parquet_doc_processor; mod parquet_indexer; +mod parquet_merge_executor; pub(crate) mod parquet_merge_messages; mod parquet_merge_planner; mod parquet_merge_split_downloader; @@ -47,6 +48,7 @@ pub use parquet_doc_processor::{ ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc, }; pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch}; +pub use parquet_merge_executor::ParquetMergeExecutor; pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits}; pub use parquet_merge_planner::ParquetMergePlanner; pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs index bd968138142..1268583a8c7 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs @@ -95,7 +95,8 @@ pub struct ParquetSplitBatch { /// The uploader uses this to locate and upload the actual file content. pub output_dir: PathBuf, /// Checkpoint delta covering all data in these splits. - pub checkpoint_delta: IndexCheckpointDelta, + /// `None` for merge operations (data was already checkpointed at ingest). + pub checkpoint_delta_opt: Option, /// Publish lock for coordinating with sources. pub publish_lock: PublishLock, /// Optional publish token. @@ -103,6 +104,10 @@ pub struct ParquetSplitBatch { /// Split IDs being replaced by this batch (non-empty for merges). /// Empty for the ingest path. pub replaced_split_ids: Vec, + /// Holds the temp directory alive until the uploader finishes reading. + /// `None` for the ingest path (packager manages its own temp dir). + /// `Some` for the merge path (executor's scratch directory). + pub _scratch_directory_opt: Option, } /// ParquetIndexer actor that accumulates RecordBatches and forwards them to ParquetPackager. diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs new file mode 100644 index 00000000000..a6f81223e62 --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs @@ -0,0 +1,216 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet merge executor actor. +//! +//! Calls the Phase 1 merge engine (`merge_sorted_parquet_files`) via +//! `run_cpu_intensive()`, builds output split metadata using +//! `merge_parquet_split_metadata()`, and sends the result to the uploader. + +use anyhow::Context; +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::thread_pool::run_cpu_intensive; +use quickwit_parquet_engine::merge::metadata_aggregation::merge_parquet_split_metadata; +use quickwit_parquet_engine::merge::{MergeConfig, MergeOutputFile, merge_sorted_parquet_files}; +use quickwit_parquet_engine::storage::ParquetWriterConfig; +use quickwit_proto::types::IndexUid; +use tracing::{info, instrument, warn}; + +use super::ParquetUploader; +use super::parquet_indexer::ParquetSplitBatch; +use super::parquet_merge_messages::ParquetMergeScratch; +use crate::models::PublishLock; + +/// Executes Parquet merge operations using the Phase 1 k-way merge engine. +/// +/// Receives `ParquetMergeScratch` from the downloader, runs the merge as a +/// CPU-intensive task, builds output metadata, and sends the result to the +/// uploader for staging and upload. +/// +/// No separate Packager step is needed — the merge engine produces +/// ready-to-upload Parquet files with complete metadata. +pub struct ParquetMergeExecutor { + uploader_mailbox: Mailbox, +} + +impl ParquetMergeExecutor { + pub fn new(uploader_mailbox: Mailbox) -> Self { + Self { uploader_mailbox } + } +} + +#[async_trait] +impl Actor for ParquetMergeExecutor { + type ObservableState = (); + + fn observable_state(&self) {} + + fn name(&self) -> String { + "ParquetMergeExecutor".to_string() + } + + fn queue_capacity(&self) -> QueueCapacity { + QueueCapacity::Bounded(1) + } +} + +#[async_trait] +impl Handler for ParquetMergeExecutor { + type Reply = (); + + #[instrument(name = "parquet_merge_executor", skip_all, fields( + merge_split_id = %scratch.merge_operation.merge_split_id, + num_inputs = scratch.merge_operation.splits.len(), + ))] + async fn handle( + &mut self, + scratch: ParquetMergeScratch, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + let merge_split_id = scratch.merge_operation.merge_split_id.to_string(); + let num_inputs = scratch.merge_operation.splits.len(); + + info!( + merge_split_id = %merge_split_id, + num_inputs, + total_bytes = scratch.merge_operation.total_size_bytes(), + "executing parquet merge" + ); + + // Separate output subdirectory so the merge engine's temp files + // don't collide with the downloaded inputs in scratch_directory. + let output_dir = scratch.scratch_directory.path().join("merged_output"); + std::fs::create_dir_all(&output_dir) + .context("failed to create merge output directory") + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + + // Run the CPU-intensive merge on the dedicated thread pool. + let input_paths = scratch.downloaded_parquet_files.clone(); + let output_dir_clone = output_dir.clone(); + let merge_result = run_cpu_intensive(move || { + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default(), + }; + merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config) + }) + .await; + + // We return Ok(()) on merge failure rather than Err to keep the actor + // alive — same strategy as Tantivy's MergeExecutor. This prevents a + // single "split of death" from crash-looping the entire pipeline. + // The trade-off: failed splits aren't retried until pipeline respawn. + let outputs: Vec = match merge_result { + Ok(Ok(outputs)) => outputs, + Ok(Err(merge_err)) => { + warn!( + error = %merge_err, + merge_split_id = %merge_split_id, + "parquet merge failed" + ); + // Input splits were drained from the planner by operations(). + // They remain published but won't be re-planned until respawn. + return Ok(()); + } + Err(panicked) => { + warn!( + error = %panicked, + merge_split_id = %merge_split_id, + "parquet merge panicked" + ); + return Ok(()); + } + }; + + // Empty output is valid (all input rows were empty). Nothing to publish. + if outputs.is_empty() { + info!( + merge_split_id = %merge_split_id, + "merge produced no output (all inputs empty)" + ); + return Ok(()); + } + + // Build metadata for each output file and rename to match split IDs. + let input_splits = &scratch.merge_operation.splits; + let index_uid: IndexUid = input_splits[0] + .index_uid + .parse() + .context("invalid index_uid in merge input") + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + + let replaced_split_ids: Vec = input_splits + .iter() + .map(|s| s.split_id.as_str().to_string()) + .collect(); + + let mut merged_splits = Vec::with_capacity(outputs.len()); + for output in &outputs { + let metadata = merge_parquet_split_metadata(input_splits, output) + .context("failed to build merge output metadata") + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + + // The merge engine writes to a temp filename (merge_output_*.parquet). + // Rename to {split_id}.parquet so the uploader can find it at the + // path derived from ParquetSplitMetadata::parquet_filename(). + let expected_path = output_dir.join(&metadata.parquet_file); + if output.path != expected_path { + std::fs::rename(&output.path, &expected_path) + .with_context(|| { + format!( + "failed to rename merge output {} to {}", + output.path.display(), + expected_path.display() + ) + }) + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + } + + info!( + split_id = %metadata.split_id, + num_rows = metadata.num_rows, + size_bytes = metadata.size_bytes, + "merge produced output split" + ); + + merged_splits.push(metadata); + } + + // Send to uploader. Merges have no checkpoint delta, no publish lock, + // and no publish token — they're just reorganizing existing data. + // The scratch directory is passed along to keep it alive until the + // uploader finishes reading the merged files. + let batch = ParquetSplitBatch { + index_uid, + splits: merged_splits, + output_dir, + checkpoint_delta_opt: None, + publish_lock: PublishLock::default(), + publish_token_opt: None, + replaced_split_ids, + _scratch_directory_opt: Some(scratch.scratch_directory), + }; + + ctx.send_message(&self.uploader_mailbox, batch).await?; + + // The merge permit is dropped here when `scratch` goes out of scope, + // releasing the semaphore slot for the next merge. + info!( + merge_split_id = %merge_split_id, + "parquet merge complete, sent to uploader" + ); + Ok(()) + } +} diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs index 9376ba71fed..b412273cd49 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs @@ -12,27 +12,53 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Stub actor for downloading Parquet files prior to merge. +//! Actor that downloads Parquet files from object storage for merge. //! -//! The full implementation (PR 3c) will download each input split's Parquet -//! file from object storage to a local temp directory, then forward a -//! `ParquetMergeScratch` to the `ParquetMergeExecutor`. -//! -//! This stub exists so that `MergeSchedulerService` can reference the -//! `Mailbox` type and the `ParquetMergePlanner` -//! can be tested end-to-end through the scheduler. +//! For each `ParquetMergeTask`, downloads all input split files to a local +//! temp directory, then forwards a `ParquetMergeScratch` to the +//! `ParquetMergeExecutor`. + +use std::path::Path; +use std::sync::Arc; +use anyhow::Context; use async_trait::async_trait; -use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, QueueCapacity}; -use tracing::debug; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::temp_dir::TempDirectory; +use quickwit_storage::Storage; +use tracing::{debug, info, warn}; -use super::ParquetMergeTask; +use super::parquet_merge_executor::ParquetMergeExecutor; +use super::parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask}; -/// Downloads Parquet split files from object storage for merge. +/// Downloads Parquet split files from object storage for merge execution. /// -/// Stub implementation — accepts `ParquetMergeTask` messages but does not -/// perform real downloads. The full implementation comes in PR 3c. -pub struct ParquetMergeSplitDownloader; +/// Downloads are isolated in a separate actor so that I/O latency doesn't +/// block the CPU-intensive merge executor. Much simpler than the Tantivy +/// `MergeSplitDownloader`: Parquet splits are single files (not bundles), +/// so downloading is a straightforward `storage.copy_to_file()` per split. +pub struct ParquetMergeSplitDownloader { + /// Parent directory for creating per-merge temp directories. + scratch_directory: TempDirectory, + /// Object storage for downloading split files. + storage: Arc, + /// Downstream executor to forward downloaded files to. + executor_mailbox: Mailbox, +} + +impl ParquetMergeSplitDownloader { + pub fn new( + scratch_directory: TempDirectory, + storage: Arc, + executor_mailbox: Mailbox, + ) -> Self { + Self { + scratch_directory, + storage, + executor_mailbox, + } + } +} #[async_trait] impl Actor for ParquetMergeSplitDownloader { @@ -56,13 +82,68 @@ impl Handler for ParquetMergeSplitDownloader { async fn handle( &mut self, task: ParquetMergeTask, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - debug!( - merge_split_id = %task.merge_operation.merge_split_id, - num_inputs = task.merge_operation.splits.len(), - "received parquet merge task (stub — real download in PR 3c)" + let merge_split_id = task.merge_operation.merge_split_id.to_string(); + let num_inputs = task.merge_operation.splits.len(); + + info!( + merge_split_id = %merge_split_id, + num_inputs, + "downloading parquet files for merge" + ); + + // Each merge gets its own temp directory so partial downloads from a + // failed merge don't interfere with other merges. TempDirectory's Drop + // impl cleans up automatically on error paths. + let download_dir = self + .scratch_directory + .named_temp_child("parquet-merge-") + .context("failed to create merge download directory") + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + + // Download each input split's Parquet file. + let mut downloaded_paths = Vec::with_capacity(num_inputs); + for split in &task.merge_operation.splits { + let parquet_filename = split.parquet_filename(); + let local_path = download_dir.path().join(&parquet_filename); + + debug!( + split_id = %split.split_id, + parquet_file = %parquet_filename, + "downloading parquet file" + ); + + self.storage + .copy_to_file(Path::new(&parquet_filename), &local_path) + .await + .map_err(|e| { + warn!( + error = %e, + split_id = %split.split_id, + "failed to download parquet file for merge" + ); + ActorExitStatus::from(anyhow::anyhow!(e)) + })?; + + downloaded_paths.push(local_path); + ctx.record_progress(); + } + + info!( + merge_split_id = %merge_split_id, + num_files = downloaded_paths.len(), + "all parquet files downloaded for merge" ); + + let scratch = ParquetMergeScratch { + merge_operation: task.merge_operation, + downloaded_parquet_files: downloaded_paths, + scratch_directory: download_dir, + merge_permit: task.merge_permit, + }; + + ctx.send_message(&self.executor_mailbox, scratch).await?; Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs index a3ac5cbd409..3e2cf3170ba 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs @@ -233,10 +233,11 @@ impl Handler for ParquetPackager { index_uid, splits, output_dir, - checkpoint_delta, + checkpoint_delta_opt: Some(checkpoint_delta), publish_lock, publish_token_opt, replaced_split_ids: Vec::new(), + _scratch_directory_opt: None, }; ctx.send_message(&self.uploader_mailbox, split_batch) @@ -494,7 +495,11 @@ mod tests { vec![1, 3] ); assert_eq!( - split_batches[0].checkpoint_delta.source_delta, + split_batches[0] + .checkpoint_delta_opt + .as_ref() + .unwrap() + .source_delta, SourceCheckpointDelta::from_range(0..30) ); diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs index 3f03750230b..d948c0710c5 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs @@ -181,7 +181,7 @@ impl Handler for ParquetUploader { index_uid: index_uid.clone(), new_splits: Vec::new(), replaced_split_ids: Vec::new(), - checkpoint_delta_opt: Some(batch.checkpoint_delta), + checkpoint_delta_opt: batch.checkpoint_delta_opt, publish_lock: batch.publish_lock, publish_token_opt: batch.publish_token_opt, parent_span: tracing::Span::current(), @@ -217,11 +217,15 @@ impl Handler for ParquetUploader { let counters = self.counters.clone(); let output_dir = batch.output_dir; - let checkpoint_delta = batch.checkpoint_delta; + let checkpoint_delta_opt = batch.checkpoint_delta_opt; let publish_lock = batch.publish_lock; let publish_token_opt = batch.publish_token_opt; let splits = batch.splits; let replaced_split_ids = batch.replaced_split_ids; + // Hold the scratch directory alive until the upload task completes. + // For the merge path, this prevents the TempDirectory from being + // cleaned up before the upload task reads the merged files. + let _scratch_directory_guard = batch._scratch_directory_opt; debug!( index_uid = %index_uid, num_splits = splits.len(), @@ -323,7 +327,7 @@ impl Handler for ParquetUploader { index_uid, new_splits: splits, replaced_split_ids, - checkpoint_delta_opt: Some(checkpoint_delta), + checkpoint_delta_opt, publish_lock, publish_token_opt, parent_span: Span::current(), @@ -335,6 +339,8 @@ impl Handler for ParquetUploader { // Drop permit to allow next upload drop(permit_guard); + // Drop scratch directory guard after upload completes. + drop(_scratch_directory_guard); } .instrument(Span::current()), "metrics_upload_task", @@ -425,10 +431,11 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), splits, output_dir: temp_dir.path().to_path_buf(), - checkpoint_delta, + checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), publish_token_opt: None, replaced_split_ids: Vec::new(), + _scratch_directory_opt: None, }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -519,10 +526,11 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), splits, output_dir: temp_dir.path().to_path_buf(), - checkpoint_delta, + checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), publish_token_opt: None, replaced_split_ids: Vec::new(), + _scratch_directory_opt: None, }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -594,10 +602,11 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), splits: Vec::new(), output_dir: temp_dir.path().to_path_buf(), - checkpoint_delta, + checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), publish_token_opt: None, replaced_split_ids: Vec::new(), + _scratch_directory_opt: None, }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -665,10 +674,11 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), splits, output_dir: temp_dir.path().to_path_buf(), - checkpoint_delta, + checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), publish_token_opt: None, replaced_split_ids: Vec::new(), + _scratch_directory_opt: None, }; uploader_mailbox.send_message(batch).await.unwrap(); }