Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for time travel with version in Delta Lake #21052

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,31 @@ statements, the connector supports the following features:
and table management <delta-lake-schema-table-management>`
- {ref}`sql-view-management`

(delta-time-travel)=

### Time travel queries

The connector offers the ability to query historical data. This allows to
query the table as it was when a previous snapshot of the table was taken, even
if the data has since been modified or deleted.

The historical data of the table can be retrieved by specifying the version
number corresponding to the version of the table to be retrieved:

```
SELECT *
FROM example.testdb.customer_orders FOR VERSION AS OF 3
```

Use the `$history` metadata table to determine the snapshot ID of the
table like in the following query:

```
SELECT version, operation
FROM example.testdb."customer_orders$history"
ORDER BY version DESC
```

### Procedures

Use the {doc}`/sql/call` statement to perform data manipulation or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.base.VerifyException;
import com.google.common.collect.Comparators;
Expand Down Expand Up @@ -102,6 +101,7 @@
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
Expand Down Expand Up @@ -260,6 +260,7 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
Expand All @@ -274,6 +275,7 @@
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
Expand Down Expand Up @@ -400,7 +402,7 @@ public class DeltaLakeMetadata
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final boolean allowManagedTableRename;
private final Map<SchemaTableName, Long> queriedVersions = new ConcurrentHashMap<>();
private final Map<SchemaTableName, Long> latestTableVersions = new ConcurrentHashMap<>();
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
private final Map<QueriedTable, TableSnapshot> queriedSnapshots = new ConcurrentHashMap<>();

private record QueriedTable(SchemaTableName schemaTableName, long version)
Expand Down Expand Up @@ -456,28 +458,21 @@ public DeltaLakeMetadata(
this.allowManagedTableRename = allowManagedTableRename;
}

public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, long atVersion)
public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> atVersion)
{
return getSnapshot(session, table, tableLocation, Optional.of(atVersion));
}
Optional<Long> version = atVersion.or(() -> Optional.ofNullable(latestTableVersions.get(table)));

@VisibleForTesting
protected TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> atVersion)
{
try {
if (atVersion.isEmpty()) {
atVersion = Optional.ofNullable(queriedVersions.get(table));
}
if (atVersion.isPresent()) {
long version = atVersion.get();
TableSnapshot snapshot = queriedSnapshots.get(new QueriedTable(table, version));
checkState(snapshot != null, "No previously loaded snapshot found for query %s, table %s [%s] at version %s", session.getQueryId(), table, tableLocation, version);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
return snapshot;
if (version.isPresent()) {
QueriedTable queriedTable = new QueriedTable(table, version.get());
if (queriedSnapshots.containsKey(queriedTable)) {
return queriedSnapshots.get(queriedTable);
}
}

TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation);
try {
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, version);
// Lack of concurrency for given query is currently guaranteed by DeltaLakeMetadata
checkState(queriedVersions.put(table, snapshot.getVersion()) == null, "queriedLocations changed concurrently for %s", table);
checkState(latestTableVersions.put(table, snapshot.getVersion()) == null || atVersion.isPresent(), "latestTableVersions changed concurrently for %s", table);
queriedSnapshots.put(new QueriedTable(table, snapshot.getVersion()), snapshot);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
return snapshot;
}
Expand All @@ -486,6 +481,37 @@ protected TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName ta
}
}

private static long getVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
return switch (version.getPointerType()) {
// TODO https://github.com/trinodb/trino/issues/21024 Add support for reading tables with temporal versions
case TEMPORAL -> throw new TrinoException(NOT_SUPPORTED, "This connector does not support reading tables with TIMESTAMP AS OF");
case TARGET_ID -> getTargetVersion(fileSystem, tableLocation, version);
};
}

private static long getTargetVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
long snapshotId;
if (version.getVersionType() == SMALLINT || version.getVersionType() == TINYINT || version.getVersionType() == INTEGER || version.getVersionType() == BIGINT) {
snapshotId = (long) version.getVersion();
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
}
else {
throw new TrinoException(NOT_SUPPORTED, "Unsupported type for table version: " + version.getVersionType().getDisplayName());
}

try {
if (!fileSystem.newInputFile(getTransactionLogJsonEntryPath(getTransactionLogDir(tableLocation), snapshotId)).exists()) {
throw new TrinoException(INVALID_ARGUMENTS, "Delta Lake snapshot ID does not exists: " + snapshotId);
}
}
catch (IOException e) {
throw new TrinoException(INVALID_ARGUMENTS, "Delta Lake snapshot ID does not exists: " + snapshotId);
}

return snapshotId;
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down Expand Up @@ -532,6 +558,20 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
throw new UnsupportedOperationException("Calling this method is not supported. getTableHandle with versions is implemented");
}

@Override
public LocatedTableHandle getTableHandle(
ConnectorSession session,
SchemaTableName tableName,
Optional<ConnectorTableVersion> startVersion,
Optional<ConnectorTableVersion> endVersion)
{
if (startVersion.isPresent()) {
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported");
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
}

requireNonNull(tableName, "tableName is null");
if (!DeltaLakeTableName.isDataTable(tableName.getTableName())) {
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
Expand All @@ -544,7 +584,9 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
boolean managed = table.get().managed();

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));

Map<Class<?>, Object> logEntries;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
Expand Down Expand Up @@ -593,7 +635,8 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
Optional.empty(),
Optional.empty(),
Optional.empty(),
tableSnapshot.getVersion());
tableSnapshot.getVersion(),
endVersion.isPresent());
}

@Override
Expand Down Expand Up @@ -1848,7 +1891,7 @@ private long commitInsertOperation(
.map(DeltaLakeTableHandle.class::cast)
.filter(tableHandle -> handle.getTableName().equals(tableHandle.getSchemaTableName())
// disregard time travel table handles
&& tableHandle.getReadVersion() >= handle.getReadVersion())
&& !tableHandle.isTimeTravel())
.collect(toImmutableList());
long readVersionValue = readVersion.get();
if (currentVersion > readVersionValue) {
Expand Down Expand Up @@ -2860,7 +2903,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
false,
false,
Optional.empty(),
tableHandle.getReadVersion());
tableHandle.getReadVersion(),
tableHandle.isTimeTravel());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) &&
Expand Down Expand Up @@ -3156,7 +3200,8 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
Optional.empty(),
Optional.empty(),
Optional.of(analyzeHandle),
handle.getReadVersion());
handle.getReadVersion(),
handle.isTimeTravel());
TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata(
columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()),
analyzeColumnNames.orElse(allColumnNames),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private Stream<DeltaLakeSplit> getSplits(
Constraint constraint)
{
TableSnapshot tableSnapshot = deltaLakeTransactionManager.get(transaction, session.getIdentity())
.getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), tableHandle.getReadVersion());
.getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), Optional.of(tableHandle.getReadVersion()));
Stream<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(
session,
tableSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum WriteType
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
private final long readVersion;
private final boolean timeTravel;

private final Optional<Set<DeltaLakeColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
Expand Down Expand Up @@ -84,7 +85,8 @@ public DeltaLakeTableHandle(
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
@JsonProperty("readVersion") long readVersion)
@JsonProperty("readVersion") long readVersion,
@JsonProperty("timeTravel") boolean timeTravel)
{
this(
schemaName,
Expand All @@ -104,7 +106,8 @@ public DeltaLakeTableHandle(
false,
false,
Optional.empty(),
readVersion);
readVersion,
timeTravel);
}

public DeltaLakeTableHandle(
Expand All @@ -125,7 +128,8 @@ public DeltaLakeTableHandle(
boolean recordScannedFiles,
boolean isOptimize,
Optional<DataSize> maxScannedFileSize,
long readVersion)
long readVersion,
boolean timeTravel)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -146,6 +150,7 @@ public DeltaLakeTableHandle(
this.isOptimize = isOptimize;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.timeTravel = timeTravel;
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
}

Expand All @@ -169,7 +174,8 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
recordScannedFiles,
isOptimize,
maxScannedFileSize,
readVersion);
readVersion,
timeTravel);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -192,7 +198,8 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
recordScannedFiles,
true,
Optional.of(maxScannedFileSize),
readVersion);
readVersion,
timeTravel);
}

@Override
Expand Down Expand Up @@ -327,6 +334,12 @@ public long getReadVersion()
return readVersion;
}

@JsonProperty
public boolean isTimeTravel()
{
return timeTravel;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -360,7 +373,8 @@ public boolean equals(Object o)
Objects.equals(analyzeHandle, that.analyzeHandle) &&
Objects.equals(isOptimize, that.isOptimize) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
readVersion == that.readVersion;
readVersion == that.readVersion &&
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
timeTravel == that.timeTravel;
}

@Override
Expand All @@ -383,6 +397,7 @@ public int hashCode()
recordScannedFiles,
isOptimize,
maxScannedFileSize,
readVersion);
readVersion,
timeTravel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public TableFunctionAnalysis analyze(
deltaLakeMetadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> deltaLakeMetadata.cleanupQuery(session)) {
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);
ConnectorTableHandle connectorTableHandle = deltaLakeMetadata.getTableHandle(session, schemaTableName);
ConnectorTableHandle connectorTableHandle = deltaLakeMetadata.getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());
if (connectorTableHandle == null) {
throw new TableNotFoundException(schemaTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.Optional;

import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon
DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity());
metadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) {
LocatedTableHandle tableHandle = metadata.getTableHandle(session, name);
LocatedTableHandle tableHandle = metadata.getTableHandle(session, name, Optional.empty(), Optional.empty());
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
if (tableHandle == null) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void doUnregisterTable(ConnectorAccessControl accessControl, ConnectorSe
DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity());
metadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) {
LocatedTableHandle tableHandle = metadata.getTableHandle(session, schemaTableName);
LocatedTableHandle tableHandle = metadata.getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());
if (tableHandle == null) {
throw new TableNotFoundException(schemaTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

Expand Down Expand Up @@ -174,14 +175,14 @@ private void doVacuum(
metadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) {
SchemaTableName tableName = new SchemaTableName(schema, table);
ConnectorTableHandle connectorTableHandle = metadata.getTableHandle(session, tableName);
ConnectorTableHandle connectorTableHandle = metadata.getTableHandle(session, tableName, Optional.empty(), Optional.empty());
checkProcedureArgument(connectorTableHandle != null, "Table '%s' does not exist", tableName);
DeltaLakeTableHandle handle = checkValidTableHandle(connectorTableHandle);

accessControl.checkCanInsertIntoTable(null, tableName);
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion());
TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()));
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.minWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.minWriterVersion()));
Expand Down