Skip to content

Flink: Supports delete orphan files in TableMaintenance #13302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

Guosmilesmile
Copy link
Contributor

This PR aims to add support for deleting orphan files in TableMaintenance for Flink.

The relevant design document can be found at https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?tab=t.0#heading=h.qirf2hu9bhwb

tableLoader.open();
Table table = tableLoader.loadTable();
Preconditions.checkArgument(
table.io() instanceof SupportsPrefixOperations,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, in Flink I only support SupportsPrefixOperations IO. I believe we can add support for all IO types in a separate PR. If necessary, we can address it in this PR as well, but that would make things more complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be problematic if we have really big tables. In this case all of the files are read into a big List, which could cause OOM.

I don't really have a nice solution for this, but I think it is important to check if we have a better solution for this.

Comment on lines 80 to 82
Table tableOri = tableLoader.loadTable();
this.table =
MetadataTableUtils.createMetadataTableInstance(tableOri, MetadataTableType.ALL_FILES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract this from the operator, and provide this in the constructor.
It would be nice if this planner could remain independent of which table we read, and the query/table could be provided as a parameter

@Guosmilesmile Guosmilesmile force-pushed the deleteOrphanFiles branch 2 times, most recently from 800da96 to 9c0c33d Compare June 18, 2025 08:55
* @param pathFilter Filter to identify hidden paths
* @return List to collect matching file locations
*/
public static List<String> listDirRecursivelyWithFileIO(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these non-consumer based public APIs?


public FileURI() {}

public String getScheme() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... so we use Flink Java serialization?
Any better suggestion @mxm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);

In order to follow the previous Spark approach, get and set methods are used here, and Java serialization is applied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we implement our own FileURI encoder instead of using Encoders.bean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting question to decide on.
We have metadata location Strings, and file system location Strings emitted from the downstream (FSList, MetadataList) operators. We need to serialize and shuffle them, so they matched by the path component, but need the scheme and the authority for matching, and uriAsString for deleting.

It is enough to have the uriAsString to travel on the wire. For the key, we need to get the path both on the emitter side, and on the AntiJoin side. It could be a fun exercise to check which performs better. OTOH the maintenance tasks are not the performance sensitive, so we can wait with the optimization until it is needed.

I would definitely opt for a Flink serializer for the FileURI class, and we can change it when it is needed. Also using Java serialization is one of the worst solution, so I would try to avoid it whenever it is not strictly needed. Especially that in this case FileURI should not change or the state will be corrupted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's this using Java serialization when the class here is not Serializable? If nothing is specified, Flink will try to use its PoJo serializer, but there are certain requirements like a no-arg constructor (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). Otherwise, we will fall back to the Kryo serialization framework, which is not the worst but of course a dedicated serializer is always going to be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.. so Kryo, or own serializer then

* @return for chained calls
*/
public Builder prefixMismatchMode(
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this will make it awkward for the users, as they always have to use fully qualified name, because of the name conflicts. No better idea yet, so just writing down for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the restriction is removed, it will be DeleteOrphanFiles.PrefixMismatchMode. The name DeleteOrphanFiles conflicts with a class in Flink.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compiles for me:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
    private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
    public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
      this.prefixMismatchMode = newPrefixMismatchMode;
      return this;
    }

Then, from another class, this works as well:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
      DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, this is indeed feasible.

* @param newCaseSensitive case-sensitive or not
* @return for chained calls
*/
public Builder caseSensitive(boolean newCaseSensitive) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check when would we need this? Seems strange, and I don't understand why would the user need to set this.

Copy link
Contributor Author

@Guosmilesmile Guosmilesmile Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);

To use IcebergSourceSplitSerializer, the constructor requires.So I have to exposed to user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please dig a bit deeper so we understand why the IcebergSourceSplitSerializer needs the caseSensitive flat, and how it is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IcebergSourceSplit deserialization, caseSensitive needs to be passed down to the underlying layer. I’m not sure if this is to maintain compatibility with different source JSON systems or to support historical data formats. In toJson, caseSensitive is not required, but in fromJson, it needs to be passed.

IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can follow the usage of the caseSensitive in the fromJson method, and you can find, that it is used when deserializing the FileScanTask. Specifically when creating the filter (ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter, caseSensitive);)

ScanContext already contains caseSensitive. We can reuse it instead of requesting it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I will do it !

Comment on lines 78 to 79
FileURI valid = foundInTable.value();
FileURI actual = foundInFileSystem.value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we get here if the 2 authorities are not exactly the same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.. so the keyBy is done on the getPath. What happens if we have multiple versions in the metadata for the given path? Shall we check that too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the scenario where multiple versions exist for a given path should only occur when the authority changes.

In HDFS, authority changes would happen when foundInFileSystem detects a change (like a namenode-host change), and in that case, those files should not be deleted.
In S3, if the authority changes, I understand it happens when switching between different S3 endpoints or regions, and those files should also not be deleted.
I am mainly following Spark’s logic here:

Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
List<String> orphanFiles =
actualFileIdentDS
.joinWith(validFileIdentDS, joinCond, "leftouter")
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
.collectAsList();

Please let me know if there are any gaps or misunderstandings in my reasoning. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very edge case, but let's say that the authority changes, so we have multiple versions of the same path in the table metadata. Also let's say that the equalAuthorities is configured incorrectly. In this case the result will be different based on the order the files are received by the AntiJoin.

Maybe when we receive the 2nd element from the metadata, we could immediately check if the 2 values are matching or, not.


@Override
public void processElement2(StreamRecord<Exception> element) throws Exception {
hasError.add(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it worth the complexity to start dropping filesToDelete immediately after we get an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two streams are unordered. If an exception occurs in processElement2 and filesToDelete is cleared immediately, there is no guarantee that filesToDelete will be empty when processWatermark is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant:

  • If there is an error, then filesToDelete.empty(), and set a boolean flag, that new values are also dropped.
  • We still keep the exception in state, and handling the watermark is the same, or just emit all filesToDelete, since they are empty anyway when there is an error

Comment on lines 99 to 105
iterator.forEachRemaining(
rowData -> {
if (rowData != null && rowData.getString(0) != null) {
out.collect(rowData.getString(0).toString());
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically removes the generality of the TableReader. We extract the value of the first column of the row, and emit it as a string.
Do we want to create a parent class as TableReader<R> which gets a parameter (rowData, Collector<R>)->{} method to extract the values, or call this class differently, like FileNameReader for now, and do this if we reuse it in the ManifestRewrite flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can first implement an initial version according to Plan One, and then make adjustments based on the specific situation when rewriting the manifest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is Plan One in this case? 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a parent class as TableReader which gets a parameter (rowData, Collector)->{} method to extract the values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sounds good. Go ahead and do this.

@pvary
Copy link
Contributor

pvary commented Jun 27, 2025

Checked IntelliJ checkstyle warnings

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Jun 28, 2025

@pvary Peter, Thank you for pointing out these issues. I have made the corresponding changes and really thanks for your patience.

public class AntiJoin extends KeyedCoProcessFunction<String, String, String, String> {
private static final Logger LOG = LoggerFactory.getLogger(AntiJoin.class);

private transient MapState<String, Boolean> foundInTable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm: We use MapState here to dedupe the strings found in the table. Is this the best way to do it in Flink?

@Guosmilesmile: If we decide to use this, we still should leave a comment about the reason we are using MapState here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MapState principally is fine. Alternatively, we could have used an time window here, where Flink would also internally use state. But this is a custom join operator, so directly using state is fine - as long as the state eviction policy works. We register a timer for every element's timestamp. The timer ensures the state will be cleared on watermark arrival.

Configuration conf,
int maxDepth,
int maxDirectSubDirs,
List<String> remainingSubDirs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea, not sure it is a good one, could this be:

Consumer<String> remainingSubDirs,
Consumer<String> files) {

Maybe this is a bit more consistent API

@pvary
Copy link
Contributor

pvary commented Jun 30, 2025

@mxm: Could you please review the PR?

@RussellSpitzer, @szehon-ho, @liziyan-lzy: There are some Spark related changes here, where I would like to hear your voice. The main goal is to refactor out the commonly used code from the Spark code to the core, so Flink could reuse them. This is only 2 files in this case:

  • core/src/main/java/org/apache/iceberg/actions/FileURI.java - File URI parsing, matching with equal authorities and schemes
  • core/src/main/java/org/apache/iceberg/util/FileSystemWalker.java - File System file listing.

These will be reused in the Flink delete orphan file handler.

I have pinged you here, so you are aware of the context, but also asked @Guosmilesmile to separate the Spark -> core changes in a different PR, so you can focus on the Spark related changes.

Thanks,
Peter

public class AntiJoin extends KeyedCoProcessFunction<String, String, String, String> {
private static final Logger LOG = LoggerFactory.getLogger(AntiJoin.class);

private transient MapState<String, Boolean> foundInTable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MapState principally is fine. Alternatively, we could have used an time window here, where Flink would also internally use state. But this is a custom join operator, so directly using state is fine - as long as the state eviction policy works. We register a timer for every element's timestamp. The timer ensures the state will be cleared on watermark arrival.

* @return for chained calls
*/
public Builder prefixMismatchMode(
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?

}

if (!foundInTable.contains(value)) {
foundInTable.put(value, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to save the value here? Wouldn't a ValueState suffice? The state is always scoped by key. The current key is always available in processElement(..) or in onTimer(..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for initially using ValueState and later switching to MapState is as follows:

  1. The key is the URL with the schemes and authority removed, so it cannot be used directly as a key. Instead, the original URL needs to be stored.

public String getKey(String value) throws Exception {
try {
FileURI fileUri = new FileURI(value, equalSchemes, equalAuthorities);
return fileUri.getPath();

  1. We think that in certain scenarios, such as when multiple authorities exist, there may be multiple pieces of data for foundInTable. Therefore, MapState is used to avoid accidental data deletion and deduplicate data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes sense.

Comment on lines 119 to 148
if (foundInFileSystem.value() != null && foundInTablesList.isEmpty()) {
FileURI fileURI = new FileURI(foundInFileSystem.value(), equalSchemes, equalAuthorities);
out.collect(fileURI.getUriAsString());
} else if (foundInFileSystem.value() != null && !foundInTablesList.isEmpty()) {
FileURI actual = new FileURI(foundInFileSystem.value(), equalSchemes, equalAuthorities);
if (hasMismatch(actual, foundInTablesList)) {
if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.DELETE) {
out.collect(foundInFileSystem.value());
} else if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.ERROR) {
ValidationException validationException =
new ValidationException(
"Unable to determine whether certain files are orphan. "
+ "Metadata references files that match listed/provided files except for authority/scheme. "
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
+ "authorities/schemes or to 'DELETE' if you are ABSOLUTELY confident that remaining conflicting "
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
+ "Conflicting authorities/schemes");
LOG.warn(
"Unable to determine whether certain files are orphan. Found in filesystem: {} and in table: {}",
actual,
StringUtils.join(foundInTablesList, ","),
validationException);
ctx.output(
org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles.ERROR_STREAM,
validationException);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the structure to the following, to make it more readable:

  if (foundInFileSystem.value() != null) {
    if (foundInTablesList.isEmpty()) {
      ..
    } else {
      ..
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I have change it.

}

if (!foundInTable.contains(value)) {
foundInTable.put(value, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes sense.

* @return for chained calls
*/
public Builder prefixMismatchMode(
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compiles for me:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
    private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
    public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
      this.prefixMismatchMode = newPrefixMismatchMode;
      return this;
    }

Then, from another class, this works as well:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
      DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);

DELETE_FILE_SUCCEEDED_COUNTER),
2L)
.build());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that the physical files have been deleted?

Copy link
Contributor Author

@Guosmilesmile Guosmilesmile Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have check it in

assertThat(inMetadata).doesNotExist();
assertThat(inData).doesNotExist();

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! We still need a review from the Iceberg core and Spark side.

@Guosmilesmile
Copy link
Contributor Author

@mxm Max, Thank you very much for your review.

Regarding the core and Spark parts, I have already separated them into a standalone PR (#13429) for easier review, and I am still waiting for further information.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants