Skip to content

feat: support bounded compaction planner#6095

Closed
zhangyue19921010 wants to merge 3 commits into
lance-format:mainfrom
zhangyue19921010:bounded_io_compaction_planner_final
Closed

feat: support bounded compaction planner#6095
zhangyue19921010 wants to merge 3 commits into
lance-format:mainfrom
zhangyue19921010:bounded_io_compaction_planner_final

Conversation

@zhangyue19921010
Copy link
Copy Markdown
Contributor

Closes: #6039

@github-actions github-actions Bot added enhancement New feature or request python java labels Mar 4, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 4, 2026

PR Review: feat: support bounded compaction planner

P0 – Sequential I/O in BoundedCompactionPlanner

The DefaultCompactionPlanner collects fragment metrics in parallel using .buffered(io_parallelism):

// DefaultCompactionPlanner (existing)
let fragment_metrics = dataset.get_fragments().into_iter()
    .map(|f| async move { ... collect_metrics ... })
    .buffered(dataset.object_store().io_parallelism());

The new BoundedCompactionPlanner collects metrics sequentially in a for loop:

// BoundedCompactionPlanner (new) – bounded_compaction_planner.rs
for (position, fragment) in dataset.manifest.fragments.iter().enumerate() {
    let metrics = collect_metrics(&file_fragment).await?;  // one-at-a-time
    ...
}

Even though the bounded planner stops early, each I/O call still blocks until complete before the next one starts. For datasets stored on remote object stores (S3/GCS/Azure), this means each fragment's deletion file read incurs a full round-trip latency serially. Consider pre-fetching metrics in parallel (e.g., with a bounded buffer) and then consuming them sequentially for budget checking.

P1 – Unrelated diff in lance-datafusion/src/planner.rs

The removal of location!() from Error::invalid_input calls in planner.rs is unrelated to the bounded compaction planner feature. This should be a separate commit/PR to keep the change set focused.

P1 – Inconsistent validation across bindings

Python's _resolve_compaction_planner raises an error when planner='default' is combined with max_compaction_rows/max_compaction_bytes:

if normalized == "default" and has_limit:
    raise ValueError("planner='default' cannot be combined with ...")

But neither the Rust resolve_compaction_planner in java/lance-jni/src/utils.rs nor the one in python/src/dataset/optimize.rs has this guard. A user passing planner="default" + max_compaction_rows=1000 through the Java API or the lower-level Python Compaction.plan() API will silently use the default planner and ignore the limit. Validation should be consistent—ideally enforced once in Rust.

Minor nits (non-blocking)

  • check_and_update_usage relies on file_size_bytes which may be 0 for older datasets (pre-v2 fragments). When using max_compaction_bytes against such datasets the byte budget is effectively infinite. A log warning when file_size_bytes is missing would help users debug unexpected behavior.
  • The "always compact at least one fragment" behavior (when the first candidate already exceeds the budget and candidate_bins is empty) is reasonable, but a doc comment explaining this guarantee would be helpful.

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 4, 2026

Codecov Report

❌ Patch coverage is 87.50000% with 29 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/dataset/optimize.rs 72.58% 15 Missing and 2 partials ⚠️
...e/compaction_planner/bounded_compaction_planner.rs 92.89% 11 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

Comment on lines +205 to +210
/// The type of compaction planner to use. Defaults to [`CompactionPlannerType::Default`].
pub compaction_planner_type: CompactionPlannerType,
/// The maximum number of bytes to compact in a single compaction operation.
pub max_compaction_bytes: Option<usize>,
/// The maximum number of rows to compact in a single compaction operation.
pub max_compaction_rows: Option<usize>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some difficulty with how these will be presented as options to the user. Regardless of which way I cut it, it feels like we're leaking implementation details that they should not be concerned about. For example (using spark SQL as an example):

OPTIMIZE <table> WITH (max_compaction_row=100000)
```

Will do nothing, because the user has to explicitly set the planner type:
```
OPTIMIZE <table> WITH (planner="bounded", max_compaction_row=100000)
```

Alternatively, if we remove the `planner` configuration option then this gets really messy as we add additional options that are disjoint. For example, if we had a "fooable" planner that can "foo" with "how_to_foo" configuration option:
```
OPTIMIZE <table> WITH (max_compaction_row=100000, how_to_foo=carefully)
```
means that we can't just create a different planner based on the passed options, because is ^^^ `FooablePlanner` or `BoundedPlanner`

To add maximum compaction bytes / rows configuration is there a reason we need to add a different planner implementation? Or can these be presented as options that are applied in the current planning scheme?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hamersaw Thanks a lot for your attention! Looking forward to more discussions with you. After we reach a consensus on the design, I will update the implementation of the current PR.

I understand there are currently two main issues:

  1. How to better expose the concepts of planners and their corresponding parameters to users.
  2. Why a separate bounded planner needs to be added here.

How to better expose the concepts of planners and their corresponding parameters to users.

I believe we can refer to the design of how various indexes are exposed to users. Here, we support different types of indexes, and each index has its own tuning parameters.

    def create_scalar_index(
        self,
        column: str,
        index_type: Union[
            Literal["BTREE"],
            Literal["BITMAP"],
            Literal["LABEL_LIST"],
            Literal["INVERTED"],
            Literal["FTS"],
            Literal["NGRAM"],
            Literal["ZONEMAP"],
            Literal["BLOOMFILTER"],
            Literal["RTREE"],
            IndexConfig,
        ],
        name: Optional[str] = None,
        *,
        replace: bool = True,
        train: bool = True,
        fragment_ids: Optional[List[int]] = None,
        index_uuid: Optional[str] = None,
        **kwargs,
    ):

On the Compaction side, add new parameters: planner and**kwargs

    def compact_files(
        self,
        *,
        target_rows_per_fragment: int = 1024 * 1024,
        max_rows_per_group: int = 1024,
        max_bytes_per_file: Optional[int] = None,
        materialize_deletions: bool = True,
        materialize_deletions_threshold: float = 0.1,
        num_threads: Optional[int] = None,
        batch_size: Optional[int] = None,
        compaction_mode: Optional[
            Literal["reencode", "try_binary_copy", "force_binary_copy"]
        ] = None,
        binary_copy_read_batch_bytes: Optional[int] = None,
        planner: Union[
            Literal["DEFAULT"],
            Literal["BOUNDED"],
        ] = "DEFAULT",
        **kwargs,
    ) -> CompactionMetrics:

During parsing, depending on the planner, attempt to capture parameters specific to the current planner from**kwargs (ignoring irrelevant parameters) and pass them to the Rust side.

For invocation, it is similar to:

    metrics = dataset.optimize.compact_files(
    planner = "bounded",
    max_compaction_rows = 1000000,
    )

Why a separate bounded planner needs to be added here.

Generally speaking, a compaction plan is roughly divided into three steps: first, obtaining fragments, second, building compaction tasks, and third, constructing the compaction plan.

The main positioning of the bounded planner here is an "incremental" compaction plan.

  1. Incrementally obtain fragments, obtain version diff fragments (TBD)
  2. Bounded plan compaction, When building tasks, construct a partial compaction plan through restrictive conditions such as max row and max bytes, rather than a full one.
  3. In the future, other "incremental" logics can be flexibly added.

In contrast, the Default planner is a full compaction plan.

  1. Obtain all fragments of the current version
  2. Build all compaction tasks at once

Of course, we can modify the Default planner to limit the number of tasks it outputs. However, there is a detailed issue here.
The pseudocode logic of the Default planner is as follows:

let metrics = collect all fragment metrics paralle at once

for metrics in metrics {
  build task
}

Under the condition of an incremental planner, we may prefer to collect on demand, especially in large tables with hundreds of thousands of fragments. Avoid the memory pressure and unnecessary computational overhead caused by full-scale fragment perception and calculation, that is:

for fragment in fragments {
  let metrics = collect current fragment metrics
  if condition(metrics) {
      break;
  }
}

It might be a good choice to distinguish between incremental planners and full planners.

Is that make sense? I am open to all the above. Please let me know in time if you have anything want to discuss! Thanks in advance.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great points! In my experience, the cost of planning a compaction (ex. retrieving fragment metadata and aggregating into compaction tasks) is quite small when compared to the actually compaction execution. So I do not think it should be something to build around.

I think bounding compaction executions can come in two flavors (1) limiting the number of input fragments and (2) limiting the number of compaction tasks. With an option, we can still do the former easily. The latter, will need to retrieve all fragments, compile the plan, and then just return / execute a small subset.

Either way, I think that building this using compaction options gives the same functionality and is easier for users to configure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bounded Compaction Planner To Limit the amount of data processed during a single compaction

2 participants