-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Spark: option to set predicate for filtering files in compaction #13327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
3bb1f05
to
593bba7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[doubt] does this means the table contains both the files in remote region as well as files in region table is in ? do you create a replace operation when these files from remote are copied to local ?
what about the delete files these data files refers to, can it be possible if the delete files are in remote and data files are in local ?
CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles(); | ||
CloseableIterable<FileScanTask> fileScanTasks = | ||
CloseableIterable.filter( | ||
scan.planFiles(), fileScanTask -> fileFilter.test(fileScanTask.file())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check for delete files as well, may be then we just call it a path based filter rathen than it being opinionated on the data files only ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we want to check for delete files here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly comming from case where data file is local and delete files remote, we can check the deletes files here if we agree on path based filters rather than just data file filter WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you would ever want to filter out delete files from compaction, unless I'm not following what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree :), what I meant is reject the whole FileScanTask if it has deletes which refers to remote deletes ?
currently we are rejecting the whole fileScanTask if the datafile is remote right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that adds unnecessary complexity. This PR is meant as an additional way to filter the data files. We wouldn't use the functionality you described in our system, as factoring in delete files is more complex, e.g. we also need to account for the data file path in the delete files. (We currently don't allow row level deletes for multiregion tables).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding was we already have figured out the delete files applicable to the data file, so if we exclude the delete files consistently all the data files and hence the its applicable FS tasks which it refers to will also be removed ?
sure thing ! may be future considerations, thank you so much for brainstorming with me, very interesting use !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was, we need to do additional work to support delete files in our particular case, i.e. update the path in the delete files. It might be useful for others, perhaps that can be added in a separate PR.
That's right, data files are written in multiple regions. We move the files and commit a replace operation.
Currently the multi-region tables are v1 tables, so it isn't an issue, but something we'd need to consider if we did want to support row level deletes. |
* @param predicate A predicate that returns true to include a file, false otherwise | ||
* @return this for chaining | ||
*/ | ||
default RewriteDataFiles fileFilter(Predicate<DataFile> predicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I think we are very cautious about allowing functional arguments to public APIs. I think it makes sense to add some additional filtering options but I'm wondering if they can be exposed in a different way?
For example couldn't we use an "expression" which touches a "file_location" as a metadata column to cover the use case specified in the description?
I just want to make sure we really do have large enough number of use cases to validate opening up the interface here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give this some thought, I'll move this to target the 1.11 release. For now we can rely on our internal fork.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very interesting idea and worth explore.
ManifestGroup
support two filter expressions
ManifestGroup filterData(Expression newDataFilter)
ManifestGroup filterFiles(Expression newFileFilter)
FindFiles
use the file filter
Iterable<DataFile> files =
FindFiles.in(table)
.withMetadataMatching(Expressions.startsWith("file_path", "/path/to/data-a"))
.collect();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK great, I will make this change and follow this existing approach.
593bba7
to
6426e9c
Compare
This PR adds an option to
RewriteDataFiles
to set a predicate for filtering out data files by attributes. You can already specify an expression to filter out data files, but there is currently no way to filter out data files by attributes such as the data file location.We have tables with data landing in multiple regions in S3. When new data is committed, we trigger various processes, such as moving data in remote regions to the table location, as well as running rewrite data files to compact the data. This new file filter option allows us to filter out data files in remote regions (based on the location) so we only compact data local to the table location. This prevents concurrency issues (moving data while compacting), and also allows the server-side file move to take precedence over loading files in remote regions.
Another use case we have is to optimize sort compaction. We can filter out files we know are already sorted by some data file attributes, to avoid re-sorting.