-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Core: Fix filed ids of partition stats file #13329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
record.get(PARTITION_FIELD_ID, StructLike.class), | ||
record.get(SPEC_ID.fieldId(), Integer.class)); | ||
stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class)); | ||
stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class)); | ||
record.get(PARTITION_POSITION, StructLike.class), | ||
record.get(SPEC_ID_POSITION, Integer.class)); | ||
stats.set(DATA_RECORD_COUNT_POSITION, record.get(DATA_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set(DATA_FILE_COUNT_POSITION, record.get(DATA_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set( | ||
TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), | ||
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class)); | ||
TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, | ||
record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, Long.class)); | ||
stats.set( | ||
POSITION_DELETE_RECORD_COUNT.fieldId(), | ||
record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class)); | ||
POSITION_DELETE_RECORD_COUNT_POSITION, | ||
record.get(POSITION_DELETE_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set( | ||
POSITION_DELETE_FILE_COUNT.fieldId(), | ||
record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class)); | ||
POSITION_DELETE_FILE_COUNT_POSITION, | ||
record.get(POSITION_DELETE_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set( | ||
EQUALITY_DELETE_RECORD_COUNT.fieldId(), | ||
record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class)); | ||
EQUALITY_DELETE_RECORD_COUNT_POSITION, | ||
record.get(EQUALITY_DELETE_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set( | ||
EQUALITY_DELETE_FILE_COUNT.fieldId(), | ||
record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class)); | ||
stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class)); | ||
stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class)); | ||
EQUALITY_DELETE_FILE_COUNT_POSITION, | ||
record.get(EQUALITY_DELETE_FILE_COUNT_POSITION, Integer.class)); | ||
stats.set(TOTAL_RECORD_COUNT_POSITION, record.get(TOTAL_RECORD_COUNT_POSITION, Long.class)); | ||
stats.set(LAST_UPDATED_AT_POSITION, record.get(LAST_UPDATED_AT_POSITION, Long.class)); | ||
stats.set( | ||
LAST_UPDATED_SNAPSHOT_ID.fieldId(), | ||
record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class)); | ||
LAST_UPDATED_SNAPSHOT_ID_POSITION, | ||
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just do this?
PartitionStats stats =
new PartitionStats(
record.get(PARTITION_POSITION, StructLike.class),
record.get(SPEC_ID_POSITION, Integer.class));
for(int i = 0; i<record.size(); ++i) {
stats.set(i, record.get(i, Object.class));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since, both are same datatype, it should be possible. Let me check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i
should start from 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for sure. But I got what he mean and updated locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPEC_ID_POSITION + 1
static final int PARTITION_POSITION = 0; | ||
static final int SPEC_ID_POSITION = 1; | ||
static final int DATA_RECORD_COUNT_POSITION = 2; | ||
static final int DATA_FILE_COUNT_POSITION = 3; | ||
static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4; | ||
static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5; | ||
static final int POSITION_DELETE_FILE_COUNT_POSITION = 6; | ||
static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7; | ||
static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8; | ||
static final int TOTAL_RECORD_COUNT_POSITION = 9; | ||
static final int LAST_UPDATED_AT_POSITION = 10; | ||
static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you suggesting to use the hardcoded value like 0,1,2 instead?
@@ -65,29 +65,44 @@ private PartitionStatsHandler() {} | |||
|
|||
private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class); | |||
|
|||
public static final int PARTITION_FIELD_ID = 0; | |||
// schema of the partition stats file as per spec | |||
public static final int PARTITION_FIELD_ID = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are binary breaking changes in 1.9.2 compared to 1.9.0.
What is our policy here?
Do we announce that the 1.9.0 partition stats implementation is broken, and we don't support it?
In this case we might be ok to do incompatible changes (making it compatible with the spec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer: thoughts? Should be part of only 1.10.0?
String invalidSchema = | ||
getClass() | ||
.getClassLoader() | ||
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just generate the file on the fly instead of checking in the binary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have old schema in code now.
So, creating a parquet file as per old schema, adding rows etc will be a big chunk of code.
I observed that we do maintain puffin files, delete files and metadata files in resource folder
https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg
Hence, I kept it to reduce PR code size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a way to generate stats with old schema with few lines of code and added it instead of binaries.
String invalidSchema = | ||
getClass() | ||
.getClassLoader() | ||
.getResource("org/apache/iceberg/PartitionStatsInvalidSchema.avro") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just generate the file on the fly instead of checking in the binary?
@@ -280,6 +304,8 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental( | |||
oldStats.forEach( | |||
partitionStats -> | |||
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | |||
} catch (IllegalArgumentException | IllegalStateException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callstack for parquet
Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
java.lang.IllegalArgumentException: Not a primitive type: struct<1000: c2: optional string, 1001: c3: optional string>
at org.apache.iceberg.types.Type.asPrimitiveType(Type.java:73)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:53)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:192)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:207)
at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:49)
at org.apache.iceberg.parquet.ParquetSchemaUtil.pruneColumns(ParquetSchemaUtil.java:134)
at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:82)
at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:74)
at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:94)
at org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:196)
at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:139)
at org.apache.iceberg.TestParquetPartitionStatsHandler.testStatsWithInvalidSchema(TestParquetPartitionStatsHandler.java:45)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
callstack for avro
java.lang.IllegalStateException: Not an instance of org.apache.iceberg.StructLike: 0
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:138)
at org.apache.iceberg.PartitionStatsHandler.recordToPartitionStats(PartitionStatsHandler.java:266)
at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:219)
...(99 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)
at org.apache.iceberg.avro.TestAvroPartitionStatsHandler.testReadingStatsWithInvalidSchema(TestAvroPartitionStatsHandler.java:57)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
or should I just catch RuntimeException
here?
@lirui-apache, @deniskuzZ: Just a heads up as you guys are using this feature as per my knowledge. And sorry for this oversight during refactoring! |
Thanks @ajantha-bhat , we are also looking at the issue internally. Guess I can incorporate your fix when it's done. |
Thanks @ajantha-bhat for the heads up, we've just upgraded the Hive to Iceberg 1.9.1 |
off-topic: partition stats file format is highly coupled with table |
Yes. In the initial proposal document jack-ye has brought this up. Spec was hardcoded that it should be in table default format. Then we updated that it can be any of the format (https://iceberg.apache.org/spec/#partition-statistics-file). So, we can have a table property to configure a different format stats than data format. But I recommend supporting InternalData for ORC, I can help on it too if needed (I did it for parquet and avro). So, that ORC can be used for writing table metadata in v4. Than changing the format of partition stats here. |
c23de67
to
9e67cfc
Compare
Flink new flaky test: #13338 |
9e67cfc
to
1724e5c
Compare
@ajantha-bhat, that would be awesome if we add an InternalData reader/writer for ORC. However, I am not sure how big that effort would be. If i understand correctly, we'll need to implement support for a few missing features, such as default values, timestamp(9), variant |
stats.set( | ||
LAST_UPDATED_SNAPSHOT_ID_POSITION, | ||
record.get(LAST_UPDATED_SNAPSHOT_ID_POSITION, Long.class)); | ||
record.get(0, StructLike.class), // partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using raw numbers directly in code is considered bad practice. Static analysis tools flag them as magic numbers and recommend replacing them with named constants. This enhances code readability and makes maintenance easier.
Alternatively, to adapt easily if the schema changes, we can introduce an index variable.
int pos = 0;
PartitionStats stats =
new PartitionStats(
record.get(pos++, StructLike.class),
record.get(pos++, Integer.class)
);
for (; pos < record.size(); pos++) {
stats.set(pos, record.get(pos, Object.class));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
1724e5c
to
abb09e1
Compare
@@ -280,6 +273,8 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental( | |||
oldStats.forEach( | |||
partitionStats -> | |||
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | |||
} catch (IllegalArgumentException | IllegalStateException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that being able to read an old stats file is not a goal here?
While working on DV support for partition stats, observed that schema field ids of partition stats file is not as per spec. Spec, field id starts from 1 and java implementation the filed id starts from 0.
This happened because of this refactoring PR. We missed that field ids are tracked in spec. We wanted to avoid ugly code of index-1 when reused the filed id for StructLike.
Updated schema id as per spec (starts from 1). Uses separate position variables for
StructLike
(starts from 0).Handled compatibility of reading old corrupted stats to fallback on full compute.