New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg: Use table schema corresponding to snapshot in snapshot queries #12786
Iceberg: Use table schema corresponding to snapshot in snapshot queries #12786
Conversation
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Show resolved
Hide resolved
dccbfe2
to
3035458
Compare
private Schema getSchema(ConnectorSession session, IcebergTableHandle table) | ||
{ | ||
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); | ||
if (table.getSnapshotId().isEmpty() || table.getSnapshotId().get() == icebergTable.currentSnapshot().snapshotId()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table.getSnapshotId()
is always filled even when not doing snapshot queries because it is the way to see whether the table has or not data in IcebergSplitManager
or TableStatisticsMaker
.
With this value always filled, there is currently no way to know whether a specific snapshot of the table is being queried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is desired behavior (explained here: #12786 (comment))
what you discovered is that snapshot-id doesn't describe table's state fully, in case there was ADD COLUMN (and no further INSERT yet).
We just need to capture what identifies the table state. I assume this is (snapshot-id, schema-id)
pair. The table handle should therefore carry both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've modified the logic of getTableHandle()
method so that the correct tableSchemaJson
and partitionSpecJson
are transported over the IcebergTableHandle
.
Thank you @findepi for hinting me towards this path.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
4327df3
to
e28fab9
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
|
||
return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); | ||
return new ConnectorTableMetadata(table, columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do table comments / properties also have a history we should be looking through during TT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are to be very puristic about time travelling, then yes.
The table metadata(s) corresponding to a snapshot contain the properties as well.
The table properties would need to be probably transported along the IcebergTableHandle (in order to avoid obtaining them on the fly by going through the table metadata files and causing unnecessary I/O operations).
|
||
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder(); | ||
columns.addAll(getColumnMetadatas(icebergTable)); | ||
columns.add(pathColumnMetadata()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, personally I think it make sense to add the path column to the list here. The other method is responsible for columns that come from the Iceberg schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this refactor with the intention of not having code duplication in the methods where the column metadatas are built.
Should I try a better naming for the method?
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Show resolved
Hide resolved
489a3d5
to
ee3a5b0
Compare
ee3a5b0
to
cd5782a
Compare
9e778d2
to
741f4d1
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
741f4d1
to
e362c8d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left initial comments.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
0b9e6ce
to
279413c
Compare
279413c
to
ae09364
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Retrieve schema and partition spec depending on the table snapshot"
{ | ||
return snapshotIds.computeIfAbsent( | ||
table.name() + "@" + id, | ||
ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allowLegacySnapshotSyntax
should be part of the cache key
(pre-existing, but still sth to fix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trino/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Lines 221 to 232 in 9b7c1b9
public static long resolveSnapshotId(Table table, long snapshotId, boolean allowLegacySnapshotSyntax) | |
{ | |
if (!allowLegacySnapshotSyntax) { | |
throw new TrinoException( | |
NOT_SUPPORTED, | |
format( | |
"Failed to access snapshot %s for table %s. This syntax for accessing Iceberg tables is not " | |
+ "supported. Use the AS OF syntax OR set the catalog session property " | |
+ "allow_legacy_snapshot_syntax=true for temporarily restoring previous behavior.", | |
snapshotId, | |
table.name())); | |
} |
In case that allowLegacySnapshotSyntax
is false
we'd get a Trino exception.
I don't see the purpose behind this request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case that
allowLegacySnapshotSyntax
isfalse
we'd get a Trino exception.
only if snapshotIds
entry doesn't exist yet.
cc @phd3
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
ae09364
to
3270605
Compare
ACK the small change (https://github.com/trinodb/trino/compare/ae09364cc7eae9e7dc13a4b8e9f84e5fcbc4a6de..3270605c060f415edcdf0a2579a312323e296a18) The CI didn't run due to a merge conflict. |
daa59bb
to
641cc85
Compare
Followed up on your hint @alexjo2144 #12786 (comment) and made the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me. Just take a look at the conflicts
@@ -338,7 +351,7 @@ public IcebergTableHandle getTableHandle( | |||
Optional.empty()); | |||
} | |||
|
|||
private static long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version) | |||
private long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
824b41b
to
9c4ade2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Retrieve table schema depending on the table snapshot"
{ | ||
return snapshotIds.computeIfAbsent( | ||
table.name() + "@" + id, | ||
ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case that
allowLegacySnapshotSyntax
isfalse
we'd get a Trino exception.
only if snapshotIds
entry doesn't exist yet.
cc @phd3
verify(table.getPartitionSpecJson().isPresent(), "The table handle must contain the partion spec definition"); | ||
return new IcebergPageSink( | ||
tableSchema, | ||
PartitionSpecParser.fromJson(tableSchema, table.getPartitionSpecJson().get()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table.getPartitionSpecJson()
.orElseThrow(() -> new VerifyException("Partition spec missing in the table handle")
and revert other changes here (addition of { ... }
)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java
Outdated
Show resolved
Hide resolved
4d3fd06
to
4bb99a3
Compare
In the context of dealing with time travel queries, the partition spec is intentionally not retrieved because it would involve going through the all the metadata files of the table and finding out which is the initial metadata file (containing the partition spec) corresponding to the specified table snapshot.
In the context of the dealing with an Iceberg table with a structure which evolves over time (columns are added / dropped) in case of performing a snapshot/time travel query, the schema of the output should match the corresponding schema of the table snapshot queried.
4bb99a3
to
c30d109
Compare
Description
When performing a snapshot/time travel query, use the table schema corresponding to the snapshot.
Bugfix
In the context of the dealing with an Iceberg table with a structure which evolves over time (columns are added / dropped) in case of performing a snapshot/time travel query, the schema of the output should match the corresponding schema of the table snapshot queried.
Related issues, pull requests, and links
Fixes #12743
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
(x) Release notes entries required with the following suggested text: