Skip to content

Core: Fix filed ids of partition stats file #13329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ajantha-bhat
Copy link
Member

@ajantha-bhat ajantha-bhat commented Jun 17, 2025

While working on DV support for partition stats, observed that schema field ids of partition stats file is not as per spec. Spec, field id starts from 1 and java implementation the filed id starts from 0.

This happened because of this refactoring PR. We missed that field ids are tracked in spec. We wanted to avoid ugly code of index-1 when reused the filed id for StructLike.

Updated schema id as per spec (starts from 1). Uses separate position variables for StructLike (starts from 0).
Handled compatibility of reading old corrupted stats to fallback on full compute.

@ajantha-bhat ajantha-bhat marked this pull request as draft June 17, 2025 07:23
@ajantha-bhat ajantha-bhat changed the title Core: Fix schema id of partition stats file Core: Fix filed ids of partition stats file Jun 17, 2025
Comment on lines 242 to 289
record.get(PARTITION_FIELD_ID, StructLike.class),
record.get(SPEC_ID.fieldId(), Integer.class));
stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class));
stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class));
record.get(PARTITION_POSITION, StructLike.class),
record.get(SPEC_ID_POSITION, Integer.class));
stats.set(DATA_RECORD_COUNT_POSITION, record.get(DATA_RECORD_COUNT_POSITION, Long.class));
stats.set(DATA_FILE_COUNT_POSITION, record.get(DATA_FILE_COUNT_POSITION, Integer.class));
stats.set(
TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(),
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class));
TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, Long.class));
stats.set(
POSITION_DELETE_RECORD_COUNT.fieldId(),
record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class));
POSITION_DELETE_RECORD_COUNT_POSITION,
record.get(POSITION_DELETE_RECORD_COUNT_POSITION, Long.class));
stats.set(
POSITION_DELETE_FILE_COUNT.fieldId(),
record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class));
POSITION_DELETE_FILE_COUNT_POSITION,
record.get(POSITION_DELETE_FILE_COUNT_POSITION, Integer.class));
stats.set(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class));
EQUALITY_DELETE_RECORD_COUNT_POSITION,
record.get(EQUALITY_DELETE_RECORD_COUNT_POSITION, Long.class));
stats.set(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class));
stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class));
stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class));
EQUALITY_DELETE_FILE_COUNT_POSITION,
record.get(EQUALITY_DELETE_FILE_COUNT_POSITION, Integer.class));
stats.set(TOTAL_RECORD_COUNT_POSITION, record.get(TOTAL_RECORD_COUNT_POSITION, Long.class));
stats.set(LAST_UPDATED_AT_POSITION, record.get(LAST_UPDATED_AT_POSITION, Long.class));
stats.set(
LAST_UPDATED_SNAPSHOT_ID.fieldId(),
record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class));
LAST_UPDATED_SNAPSHOT_ID_POSITION,
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do this?

    PartitionStats stats =
        new PartitionStats(
            record.get(PARTITION_POSITION, StructLike.class),
            record.get(SPEC_ID_POSITION, Integer.class));
    for(int i = 0; i<record.size(); ++i) {
      stats.set(i, record.get(i, Object.class));
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, both are same datatype, it should be possible. Let me check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i should start from 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for sure. But I got what he mean and updated locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPEC_ID_POSITION + 1

Comment on lines 94 to 105
static final int PARTITION_POSITION = 0;
static final int SPEC_ID_POSITION = 1;
static final int DATA_RECORD_COUNT_POSITION = 2;
static final int DATA_FILE_COUNT_POSITION = 3;
static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
static final int TOTAL_RECORD_COUNT_POSITION = 9;
static final int LAST_UPDATED_AT_POSITION = 10;
static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you suggesting to use the hardcoded value like 0,1,2 instead?

@@ -65,29 +65,44 @@ private PartitionStatsHandler() {}

private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);

public static final int PARTITION_FIELD_ID = 0;
// schema of the partition stats file as per spec
public static final int PARTITION_FIELD_ID = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are binary breaking changes in 1.9.2 compared to 1.9.0.
What is our policy here?

Do we announce that the 1.9.0 partition stats implementation is broken, and we don't support it?
In this case we might be ok to do incompatible changes (making it compatible with the spec)

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer: thoughts? Should be part of only 1.10.0?

String invalidSchema =
getClass()
.getClassLoader()
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just generate the file on the fly instead of checking in the binary?

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have old schema in code now.
So, creating a parquet file as per old schema, adding rows etc will be a big chunk of code.

I observed that we do maintain puffin files, delete files and metadata files in resource folder
https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg

Hence, I kept it to reduce PR code size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a way to generate stats with old schema with few lines of code and added it instead of binaries.

String invalidSchema =
getClass()
.getClassLoader()
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.avro")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just generate the file on the fly instead of checking in the binary?

@@ -280,6 +304,8 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
} catch (IllegalArgumentException | IllegalStateException exception) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callstack for parquet

Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
java.lang.IllegalArgumentException: Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
	at org.apache.iceberg.types.Type.asPrimitiveType(Type.java:73)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:53)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:192)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:207)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:49)
	at org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumns(ParquetSchemaUtil.java:134)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:82)
	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:74)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:94)
	at org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:196)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:139)
	at org.apache.iceberg.TestParquetPartitionStatsHandler.testStatsWithInvalidSchema(TestParquetPartitionStatsHandler.java:45)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

callstack for avro

java.lang.IllegalStateException: Not an instance of org.apache.iceberg.StructLike: 0
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:138)
	at org.apache.iceberg.PartitionStatsHandler.recordToPartitionStats(PartitionStatsHandler.java:266)
	at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:219)
	...(99 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)
	at org.apache.iceberg.avro.TestAvroPartitionStatsHandler.testReadingStatsWithInvalidSchema(TestAvroPartitionStatsHandler.java:57)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

or should I just catch RuntimeException here?

@ajantha-bhat
Copy link
Member Author

ajantha-bhat commented Jun 17, 2025

@lirui-apache, @deniskuzZ: Just a heads up as you guys are using this feature as per my knowledge. And sorry for this oversight during refactoring!

@lirui-apache
Copy link
Contributor

@lirui-apache, @deniskuzZ: Just a heads up as you guys are using this feature as per my knowledge.

Thanks @ajantha-bhat , we are also looking at the issue internally. Guess I can incorporate your fix when it's done.

@deniskuzZ
Copy link
Member

Thanks @ajantha-bhat for the heads up, we've just upgraded the Hive to Iceberg 1.9.1

@deniskuzZ
Copy link
Member

off-topic: partition stats file format is highly coupled with table write.format. If CU is using ORC, he automatically loses the partition stats.
Do you think we can decouple the file formats? If yes, I can raise a PR for that.

@ajantha-bhat
Copy link
Member Author

off-topic: partition stats file format is highly coupled with table write.format. If CU is using ORC, he automatically loses the partition stats.

Yes. In the initial proposal document jack-ye has brought this up. Spec was hardcoded that it should be in table default format. Then we updated that it can be any of the format (https://iceberg.apache.org/spec/#partition-statistics-file).

So, we can have a table property to configure a different format stats than data format. But I recommend supporting InternalData for ORC, I can help on it too if needed (I did it for parquet and avro). So, that ORC can be used for writing table metadata in v4. Than changing the format of partition stats here.

@github-actions github-actions bot added the ORC label Jun 17, 2025
@ajantha-bhat ajantha-bhat marked this pull request as ready for review June 17, 2025 17:48
@ajantha-bhat
Copy link
Member Author

Flink new flaky test: #13338

@deniskuzZ
Copy link
Member

deniskuzZ commented Jun 18, 2025

off-topic: partition stats file format is highly coupled with table write.format. If CU is using ORC, he automatically loses the partition stats.

Yes. In the initial proposal document jack-ye has brought this up. Spec was hardcoded that it should be in table default format. Then we updated that it can be any of the format (https://iceberg.apache.org/spec/#partition-statistics-file).

So, we can have a table property to configure a different format stats than data format. But I recommend supporting InternalData for ORC, I can help on it too if needed (I did it for parquet and avro). So, that ORC can be used for writing table metadata in v4. Than changing the format of partition stats here.

@ajantha-bhat, that would be awesome if we add an InternalData reader/writer for ORC. However, I am not sure how big that effort would be. If i understand correctly, we'll need to implement support for a few missing features, such as default values, timestamp(9), variant

stats.set(
LAST_UPDATED_SNAPSHOT_ID_POSITION,
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class));
record.get(0, StructLike.class), // partition
Copy link
Member

@deniskuzZ deniskuzZ Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using raw numbers directly in code is considered bad practice. Static analysis tools flag them as magic numbers and recommend replacing them with named constants. This enhances code readability and makes maintenance easier.
Alternatively, to adapt easily if the schema changes, we can introduce an index variable.

int pos = 0;
PartitionStats stats = 
    new PartitionStats(
        record.get(pos++, StructLike.class),
        record.get(pos++, Integer.class)
);
for (; pos < record.size(); pos++) {
  stats.set(pos, record.get(pos, Object.class));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -280,6 +273,8 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
} catch (IllegalArgumentException | IllegalStateException exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that being able to read an old stats file is not a goal here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants