Skip to content

Spark: option to set predicate for filtering files in compaction #13327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Jun 17, 2025

This PR adds an option to RewriteDataFiles to set a predicate for filtering out data files by attributes. You can already specify an expression to filter out data files, but there is currently no way to filter out data files by attributes such as the data file location.

We have tables with data landing in multiple regions in S3. When new data is committed, we trigger various processes, such as moving data in remote regions to the table location, as well as running rewrite data files to compact the data. This new file filter option allows us to filter out data files in remote regions (based on the location) so we only compact data local to the table location. This prevents concurrency issues (moving data while compacting), and also allows the server-side file move to take precedence over loading files in remote regions.

Another use case we have is to optimize sort compaction. We can filter out files we know are already sorted by some data file attributes, to avoid re-sorting.

@bryanck bryanck added this to the Iceberg 1.10.0 milestone Jun 17, 2025
@bryanck bryanck force-pushed the compaction-file-filter branch 2 times, most recently from 3bb1f05 to 593bba7 Compare June 17, 2025 11:30
Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] does this means the table contains both the files in remote region as well as files in region table is in ? do you create a replace operation when these files from remote are copied to local ?

what about the delete files these data files refers to, can it be possible if the delete files are in remote and data files are in local ?

CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles();
CloseableIterable<FileScanTask> fileScanTasks =
CloseableIterable.filter(
scan.planFiles(), fileScanTask -> fileFilter.test(fileScanTask.file()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check for delete files as well, may be then we just call it a path based filter rathen than it being opinionated on the data files only ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we want to check for delete files here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly comming from case where data file is local and delete files remote, we can check the deletes files here if we agree on path based filters rather than just data file filter WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you would ever want to filter out delete files from compaction, unless I'm not following what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree :), what I meant is reject the whole FileScanTask if it has deletes which refers to remote deletes ?
currently we are rejecting the whole fileScanTask if the datafile is remote right ?

Copy link
Contributor Author

@bryanck bryanck Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that adds unnecessary complexity. This PR is meant as an additional way to filter the data files. We wouldn't use the functionality you described in our system, as factoring in delete files is more complex, e.g. we also need to account for the data file path in the delete files. (We currently don't allow row level deletes for multiregion tables).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding was we already have figured out the delete files applicable to the data file, so if we exclude the delete files consistently all the data files and hence the its applicable FS tasks which it refers to will also be removed ?

sure thing ! may be future considerations, thank you so much for brainstorming with me, very interesting use !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was, we need to do additional work to support delete files in our particular case, i.e. update the path in the delete files. It might be useful for others, perhaps that can be added in a separate PR.

@bryanck
Copy link
Contributor Author

bryanck commented Jun 17, 2025

[doubt] does this means the table contains both the files in remote region as well as files in region table is in ? do you create a replace operation when these files from remote are copied to local ?

That's right, data files are written in multiple regions. We move the files and commit a replace operation.

what about the delete files these data files refers to, can it be possible if the delete files are in remote and data files are in local ?

Currently the multi-region tables are v1 tables, so it isn't an issue, but something we'd need to consider if we did want to support row level deletes.

* @param predicate A predicate that returns true to include a file, false otherwise
* @return this for chaining
*/
default RewriteDataFiles fileFilter(Predicate<DataFile> predicate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think we are very cautious about allowing functional arguments to public APIs. I think it makes sense to add some additional filtering options but I'm wondering if they can be exposed in a different way?

For example couldn't we use an "expression" which touches a "file_location" as a metadata column to cover the use case specified in the description?

I just want to make sure we really do have large enough number of use cases to validate opening up the interface here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me give this some thought, I'll move this to target the 1.11 release. For now we can rely on our internal fork.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very interesting idea and worth explore.

ManifestGroup support two filter expressions

  ManifestGroup filterData(Expression newDataFilter)
  ManifestGroup filterFiles(Expression newFileFilter) 

FindFiles use the file filter

    Iterable<DataFile> files =
        FindFiles.in(table)
            .withMetadataMatching(Expressions.startsWith("file_path", "/path/to/data-a"))
            .collect();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK great, I will make this change and follow this existing approach.

@bryanck bryanck modified the milestones: Iceberg 1.10.0, Iceberg 1.11.0 Jul 2, 2025
@bryanck bryanck force-pushed the compaction-file-filter branch from 593bba7 to 6426e9c Compare July 2, 2025 17:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants