New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement table changes function for delta lake #16205
Implement table changes function for delta lake #16205
Conversation
This shouldn’t need that other table functions PR. That PR is only needed for table function that take subqueries or descriptors as input. It should be possible to model this function in the same way as the query pass-through functions that already exist in some connectors. |
Yes I know it should be possible to use query pass-through. We considered that but our conclusion was that if we went that way we could have ended up with a more complex code. Using #15575 allows to nicely encapsulate the function. |
5af6d0f
to
63e654b
Compare
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
if (processedVersion <= endVersion) { | ||
Path transactionLogDir = getTransactionLogDir(new Path(tableLocation)); | ||
try { | ||
Optional<List<DeltaLakeTransactionLogEntry>> entriesFromJson = getEntriesFromJson(processedVersion, transactionLogDir, fileSystem); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens in case that the transaction log entry file is missing?
This may very well happen when delta.logRetentionDuration
kicks in and the transaction log files before the last checkpoints get removed.
So if we request the changes from 100
to 200
, but we have transaction log entries starting from 180
(which is the last checkpoint of the table) should we get 180
-200
range ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #16192 which deals with this kind of situations for retrieving the $history
of the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we request the changes from 100 to 200, but we have transaction log entries starting from 180 (which is the last checkpoint of the table) should we get 180-200 range ?
No, it should fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...main/java/io/trino/plugin/deltalake/function/tablechanges/DeltaLakeTableChangesFunction.java
Outdated
Show resolved
Hide resolved
...in/java/io/trino/plugin/deltalake/function/tablechanges/TableChangesFunctionTableHandle.java
Outdated
Show resolved
Hide resolved
if (processedVersion <= endVersion) { | ||
Path transactionLogDir = getTransactionLogDir(new Path(tableLocation)); | ||
try { | ||
Optional<List<DeltaLakeTransactionLogEntry>> entriesFromJson = getEntriesFromJson(processedVersion, transactionLogDir, fileSystem); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we request the changes from 100 to 200, but we have transaction log entries starting from 180 (which is the last checkpoint of the table) should we get 180-200 range ?
No, it should fail
Not entirely true, I still need at least some of them. For example my |
63e654b
to
d10f71c
Compare
18e1759
to
eddba0d
Compare
eddba0d
to
9f56faf
Compare
9f56faf
to
ab94a16
Compare
just a rebase |
CI: #17295 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the replacement docs PR that is linked now. Taking off the request changes here and leaving that up to other maintainers to discuss.
Manfred supposed to dismiss this per his last comment
@homar please make sure there are tests for table_changes for tables having |
@homar please rebase to resolve conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6b1cc6c
to
25e6998
Compare
@homar, I meant, update the PR based on the discussion above to make the argument optional instead of giving -1 a special meaning. |
44bd539
to
7665f63
Compare
7665f63
to
946d31c
Compare
.filter(column -> column.getColumnType() != SYNTHESIZED) | ||
.collect(toImmutableList()); | ||
accessControl.checkCanSelectFromColumns(null, schemaTableName, columnHandles.stream() | ||
.map(DeltaLakeColumnHandle::getBaseColumnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getColumnName
|
||
ImmutableList.Builder<Descriptor.Field> outputFields = ImmutableList.builder(); | ||
columnHandles.stream() | ||
.map(columnHandle -> new Descriptor.Field(columnHandle.getBaseColumnName(), Optional.of(columnHandle.getBaseType()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBaseColumnName -> getColumnName
getBaseType -> getType, new method
@JsonIgnore
public Type getType()
{
return projectionInfo.map(DeltaLakeColumnProjectionInfo::getType)
.orElse(baseType);
}
force push https://github.com/trinodb/trino/compare/7665f6332438f2f8af1891a1ee30b94c87b72d73..946d31c37fb4f46b34371eb9b339d589fca5ddf4 combining rebase and changes made it hard to see what changed, but i tried to fish out the relevant part. i could have missed some things though |
table_changes allows reading cdf entries stream. since_version is optional and exclusive. If since_version is not provided entire history of the table will be read.
946d31c
to
80c52ad
Compare
CI: #17158 |
Description
Adds table_changes function to delta lake.
It allows reading CDF entries from table for which CDF is enabled.
This PR requires #15575 to be merged first.
Commits 5th and 6th adds some additional things to TableFunction implementation that I found necessary while working on table_changes function.
Additional context and related issues
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: