-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Flink: Supports delete orphan files in TableMaintenance #13302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
tableLoader.open(); | ||
Table table = tableLoader.loadTable(); | ||
Preconditions.checkArgument( | ||
table.io() instanceof SupportsPrefixOperations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, in Flink I only support SupportsPrefixOperations
IO. I believe we can add support for all IO types in a separate PR. If necessary, we can address it in this PR as well, but that would make things more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be problematic if we have really big tables. In this case all of the files are read into a big List, which could cause OOM.
I don't really have a nice solution for this, but I think it is important to check if we have a better solution for this.
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/FileURI.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AntiJoin.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Show resolved
Hide resolved
Table tableOri = tableLoader.loadTable(); | ||
this.table = | ||
MetadataTableUtils.createMetadataTableInstance(tableOri, MetadataTableType.ALL_FILES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract this from the operator, and provide this in the constructor.
It would be nice if this planner could remain independent of which table we read, and the query/table could be provided as a parameter
800da96
to
9c0c33d
Compare
* @param pathFilter Filter to identify hidden paths | ||
* @return List to collect matching file locations | ||
*/ | ||
public static List<String> listDirRecursivelyWithFileIO( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these non-consumer based public APIs?
|
||
public FileURI() {} | ||
|
||
public String getScheme() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually avoid get
, set
. See: https://iceberg.apache.org/contribute/#method-naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... so we use Flink Java serialization?
Any better suggestion @mxm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 686 in e86a6b9
public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class); |
In order to follow the previous Spark approach, get and set methods are used here, and Java serialization is applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we implement our own FileURI encoder instead of using Encoders.bean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting question to decide on.
We have metadata location Strings, and file system location Strings emitted from the downstream (FSList, MetadataList) operators. We need to serialize and shuffle them, so they matched by the path
component, but need the scheme
and the authority
for matching, and uriAsString
for deleting.
It is enough to have the uriAsString
to travel on the wire. For the key, we need to get the path
both on the emitter side, and on the AntiJoin
side. It could be a fun exercise to check which performs better. OTOH the maintenance tasks are not the performance sensitive, so we can wait with the optimization until it is needed.
I would definitely opt for a Flink serializer for the FileURI
class, and we can change it when it is needed. Also using Java serialization is one of the worst solution, so I would try to avoid it whenever it is not strictly needed. Especially that in this case FileURI should not change or the state will be corrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How's this using Java serialization when the class here is not Serializable
? If nothing is specified, Flink will try to use its PoJo serializer, but there are certain requirements like a no-arg constructor (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). Otherwise, we will fall back to the Kryo serialization framework, which is not the worst but of course a dedicated serializer is always going to be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad.. so Kryo, or own serializer then
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
* @return for chained calls | ||
*/ | ||
public Builder prefixMismatchMode( | ||
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this will make it awkward for the users, as they always have to use fully qualified name, because of the name conflicts. No better idea yet, so just writing down for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the restriction is removed, it will be DeleteOrphanFiles.PrefixMismatchMode. The name DeleteOrphanFiles conflicts with a class in Flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This compiles for me:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
this.prefixMismatchMode = newPrefixMismatchMode;
return this;
}
Then, from another class, this works as well:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, this is indeed feasible.
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
* @param newCaseSensitive case-sensitive or not | ||
* @return for chained calls | ||
*/ | ||
public Builder caseSensitive(boolean newCaseSensitive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please check when would we need this? Seems strange, and I don't understand why would the user need to set this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 87 in 9c0c33d
this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive); |
To use IcebergSourceSplitSerializer, the constructor requires.So I have to exposed to user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please dig a bit deeper so we understand why the IcebergSourceSplitSerializer
needs the caseSensitive
flat, and how it is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In IcebergSourceSplit deserialization, caseSensitive needs to be passed down to the underlying layer. I’m not sure if this is to maintain compatibility with different source JSON systems or to support historical data formats. In toJson, caseSensitive is not required, but in fromJson, it needs to be passed.
IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can follow the usage of the caseSensitive
in the fromJson
method, and you can find, that it is used when deserializing the FileScanTask
. Specifically when creating the filter (ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter, caseSensitive);
)
ScanContext already contains caseSensitive. We can reuse it instead of requesting it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I will do it !
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AntiJoin.java
Outdated
Show resolved
Hide resolved
FileURI valid = foundInTable.value(); | ||
FileURI actual = foundInFileSystem.value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we get here if the 2 authorities are not exactly the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.. so the keyBy is done on the getPath
. What happens if we have multiple versions in the metadata for the given path? Shall we check that too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that the scenario where multiple versions exist for a given path should only occur when the authority changes.
In HDFS, authority changes would happen when foundInFileSystem detects a change (like a namenode-host change), and in that case, those files should not be deleted.
In S3, if the authority changes, I understand it happens when switching between different S3 endpoints or regions, and those files should also not be deleted.
I am mainly following Spark’s logic here:
Lines 367 to 373 in 9c0c33d
Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); | |
List<String> orphanFiles = | |
actualFileIdentDS | |
.joinWith(validFileIdentDS, joinCond, "leftouter") | |
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()) | |
.collectAsList(); |
Please let me know if there are any gaps or misunderstandings in my reasoning. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very edge case, but let's say that the authority changes, so we have multiple versions of the same path in the table metadata. Also let's say that the equalAuthorities
is configured incorrectly. In this case the result will be different based on the order the files are received by the AntiJoin
.
Maybe when we receive the 2nd element from the metadata, we could immediately check if the 2 values are matching or, not.
...v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/FileUriConverter.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void processElement2(StreamRecord<Exception> element) throws Exception { | ||
hasError.add(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it worth the complexity to start dropping filesToDelete
immediately after we get an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two streams are unordered. If an exception occurs in processElement2 and filesToDelete is cleared immediately, there is no guarantee that filesToDelete will be empty when processWatermark is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant:
- If there is an error, then
filesToDelete.empty()
, and set a boolean flag, that new values are also dropped. - We still keep the exception in state, and handling the watermark is the same, or just emit all
filesToDelete
, since they are empty anyway when there is an error
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TablePlanner.java
Outdated
Show resolved
Hide resolved
iterator.forEachRemaining( | ||
rowData -> { | ||
if (rowData != null && rowData.getString(0) != null) { | ||
out.collect(rowData.getString(0).toString()); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically removes the generality of the TableReader
. We extract the value of the first column of the row, and emit it as a string.
Do we want to create a parent class as TableReader<R>
which gets a parameter (rowData, Collector<R>)->{}
method to extract the values, or call this class differently, like FileNameReader for now, and do this if we reuse it in the ManifestRewrite flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can first implement an initial version according to Plan One, and then make adjustments based on the specific situation when rewriting the manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is Plan One
in this case? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a parent class as TableReader which gets a parameter (rowData, Collector)->{} method to extract the values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Sounds good. Go ahead and do this.
....0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
Outdated
Show resolved
Hide resolved
...v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFiles.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AntiJoin.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SkipOnError.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SkipOnError.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TablePlanner.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableReader.java
Outdated
Show resolved
Hide resolved
....0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java
Outdated
Show resolved
Hide resolved
Checked IntelliJ checkstyle warnings |
@pvary Peter, Thank you for pointing out these issues. I have made the corresponding changes and really thanks for your patience. |
public class AntiJoin extends KeyedCoProcessFunction<String, String, String, String> { | ||
private static final Logger LOG = LoggerFactory.getLogger(AntiJoin.class); | ||
|
||
private transient MapState<String, Boolean> foundInTable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm: We use MapState
here to dedupe the strings found in the table. Is this the best way to do it in Flink?
@Guosmilesmile: If we decide to use this, we still should leave a comment about the reason we are using MapState here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MapState principally is fine. Alternatively, we could have used an time window here, where Flink would also internally use state. But this is a custom join operator, so directly using state is fine - as long as the state eviction policy works. We register a timer for every element's timestamp. The timer ensures the state will be cleared on watermark arrival.
...k/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/FileNameReader.java
Show resolved
Hide resolved
....0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/FileUriKeySelector.java
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AntiJoin.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/SkipOnError.java
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TablePlanner.java
Outdated
Show resolved
Hide resolved
Configuration conf, | ||
int maxDepth, | ||
int maxDirectSubDirs, | ||
List<String> remainingSubDirs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an idea, not sure it is a good one, could this be:
Consumer<String> remainingSubDirs,
Consumer<String> files) {
Maybe this is a bit more consistent API
@mxm: Could you please review the PR? @RussellSpitzer, @szehon-ho, @liziyan-lzy: There are some Spark related changes here, where I would like to hear your voice. The main goal is to refactor out the commonly used code from the Spark code to the core, so Flink could reuse them. This is only 2 files in this case:
These will be reused in the Flink delete orphan file handler. I have pinged you here, so you are aware of the context, but also asked @Guosmilesmile to separate the Spark -> core changes in a different PR, so you can focus on the Spark related changes. Thanks, |
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AntiJoin.java
Outdated
Show resolved
Hide resolved
public class AntiJoin extends KeyedCoProcessFunction<String, String, String, String> { | ||
private static final Logger LOG = LoggerFactory.getLogger(AntiJoin.class); | ||
|
||
private transient MapState<String, Boolean> foundInTable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MapState principally is fine. Alternatively, we could have used an time window here, where Flink would also internally use state. But this is a custom join operator, so directly using state is fine - as long as the state eviction policy works. We register a timer for every element's timestamp. The timer ensures the state will be cleared on watermark arrival.
* @return for chained calls | ||
*/ | ||
public Builder prefixMismatchMode( | ||
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?
} | ||
|
||
if (!foundInTable.contains(value)) { | ||
foundInTable.put(value, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need to save the value
here? Wouldn't a ValueState
suffice? The state is always scoped by key. The current key is always available in processElement(..)
or in onTimer(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for initially using ValueState and later switching to MapState is as follows:
- The key is the URL with the schemes and authority removed, so it cannot be used directly as a key. Instead, the original URL needs to be stored.
Lines 50 to 53 in 07126c5
public String getKey(String value) throws Exception { | |
try { | |
FileURI fileUri = new FileURI(value, equalSchemes, equalAuthorities); | |
return fileUri.getPath(); |
- We think that in certain scenarios, such as when multiple authorities exist, there may be multiple pieces of data for foundInTable. Therefore, MapState is used to avoid accidental data deletion and deduplicate data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That makes sense.
if (foundInFileSystem.value() != null && foundInTablesList.isEmpty()) { | ||
FileURI fileURI = new FileURI(foundInFileSystem.value(), equalSchemes, equalAuthorities); | ||
out.collect(fileURI.getUriAsString()); | ||
} else if (foundInFileSystem.value() != null && !foundInTablesList.isEmpty()) { | ||
FileURI actual = new FileURI(foundInFileSystem.value(), equalSchemes, equalAuthorities); | ||
if (hasMismatch(actual, foundInTablesList)) { | ||
if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.DELETE) { | ||
out.collect(foundInFileSystem.value()); | ||
} else if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.ERROR) { | ||
ValidationException validationException = | ||
new ValidationException( | ||
"Unable to determine whether certain files are orphan. " | ||
+ "Metadata references files that match listed/provided files except for authority/scheme. " | ||
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal " | ||
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. " | ||
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " | ||
+ "authorities/schemes or to 'DELETE' if you are ABSOLUTELY confident that remaining conflicting " | ||
+ "authorities/schemes are different. It will be impossible to recover deleted files. " | ||
+ "Conflicting authorities/schemes"); | ||
LOG.warn( | ||
"Unable to determine whether certain files are orphan. Found in filesystem: {} and in table: {}", | ||
actual, | ||
StringUtils.join(foundInTablesList, ","), | ||
validationException); | ||
ctx.output( | ||
org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles.ERROR_STREAM, | ||
validationException); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the structure to the following, to make it more readable:
if (foundInFileSystem.value() != null) {
if (foundInTablesList.isEmpty()) {
..
} else {
..
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I have change it.
} | ||
|
||
if (!foundInTable.contains(value)) { | ||
foundInTable.put(value, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That makes sense.
* @return for chained calls | ||
*/ | ||
public Builder prefixMismatchMode( | ||
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This compiles for me:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
this.prefixMismatchMode = newPrefixMismatchMode;
return this;
}
Then, from another class, this works as well:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);
DELETE_FILE_SUCCEEDED_COUNTER), | ||
2L) | ||
.build()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check that the physical files have been deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have check it in
Lines 154 to 155 in e44328b
assertThat(inMetadata).doesNotExist(); | |
assertThat(inData).doesNotExist(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! We still need a review from the Iceberg core and Spark side.
This PR aims to add support for deleting orphan files in TableMaintenance for Flink.
The relevant design document can be found at https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?tab=t.0#heading=h.qirf2hu9bhwb