Skip to content

Commit

Permalink
fixup! Add support for time travel with version in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 22, 2024
1 parent 0b1714d commit 4c0711a
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 31 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ The connector offers the ability to query historical data. This allows you to
query the table as it was when a previous snapshot of the table was taken, even
if the data has since been modified or deleted.

The historical data of the table can be retrieved by specifying the snapshot
identifier corresponding to the version of the table to be retrieved:
The historical data of the table can be retrieved by specifying the version
number corresponding to the version of the table to be retrieved:

```
SELECT *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,29 +459,20 @@ public DeltaLakeMetadata(
this.allowManagedTableRename = allowManagedTableRename;
}

public TableSnapshot getSnapshot(
ConnectorSession session,
SchemaTableName table,
String tableLocation,
Optional<Long> atVersion,
Optional<ConnectorTableVersion> endVersion)
public TableSnapshot getCurrentSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> atVersion)
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Optional<Long> endSnapshotVersion = endVersion.map(version -> getSnapshotIdFromVersion(fileSystem, tableLocation, version));
try {
if (atVersion.isEmpty()) {
atVersion = Optional.ofNullable(queriedVersions.get(table));
}
if (atVersion.isPresent()) {
long version = atVersion.get();
TableSnapshot snapshot = queriedSnapshots.get(new QueriedTable(table, version));
if (snapshot == null) { // This may happen when INSERT statement uses time travel in the source table
return transactionLogAccess.loadSnapshot(session, table, tableLocation, Optional.of(version));
}
checkState(snapshot != null, "No previously loaded snapshot found for query %s, table %s [%s] at version %s", session.getQueryId(), table, tableLocation, version);
return snapshot;
}

TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, endSnapshotVersion);
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, Optional.empty());
// Lack of concurrency for given query is currently guaranteed by DeltaLakeMetadata
checkState(queriedVersions.put(table, snapshot.getVersion()) == null, "queriedLocations changed concurrently for %s", table);
queriedSnapshots.put(new QueriedTable(table, snapshot.getVersion()), snapshot);
Expand All @@ -492,6 +483,27 @@ public TableSnapshot getSnapshot(
}
}

public TableSnapshot getTimeTravelSnapshot(
ConnectorSession session,
SchemaTableName table,
String tableLocation,
long version)
{
QueriedTable queriedTable = new QueriedTable(table, version);
if (queriedSnapshots.containsKey(queriedTable)) {
return queriedSnapshots.get(queriedTable);
}

try {
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, Optional.of(version));
queriedSnapshots.put(queriedTable, snapshot);
return snapshot;
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting snapshot for " + table, e);
}
}

private long getSnapshotIdFromVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
return switch (version.getPointerType()) {
Expand Down Expand Up @@ -596,7 +608,16 @@ public LocatedTableHandle getTableHandle(
boolean managed = table.get().managed();

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty(), endVersion);
TableSnapshot tableSnapshot;
if (endVersion.isEmpty()) {
tableSnapshot = getCurrentSnapshot(session, tableName, tableLocation, Optional.empty());
}
else {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long version = getSnapshotIdFromVersion(fileSystem, tableLocation, endVersion.get());
tableSnapshot = getTimeTravelSnapshot(session, tableName, tableLocation, version);
}

Map<Class<?>, Object> logEntries;
try {
logEntries = transactionLogAccess.getTransactionLogEntries(
Expand Down Expand Up @@ -872,7 +893,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
if (!isTableStatisticsEnabled(session)) {
return TableStatistics.empty();
}
return tableStatisticsProvider.getTableStatistics(session, handle, getSnapshot(session, handle));
return tableStatisticsProvider.getTableStatistics(session, handle, getCurrentSnapshot(session, handle));
}

@Override
Expand Down Expand Up @@ -1454,7 +1475,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
boolean tableHasDataFiles;
try (Stream<AddFileEntry> addFileEntries = transactionLogAccess.getActiveFiles(
session,
getSnapshot(session, handle),
getCurrentSnapshot(session, handle),
handle.getMetadataEntry(),
handle.getProtocolEntry(),
TupleDomain.all(),
Expand Down Expand Up @@ -2426,9 +2447,9 @@ private void checkSupportedWriterVersion(DeltaLakeTableHandle handle)
}
}

private TableSnapshot getSnapshot(ConnectorSession session, DeltaLakeTableHandle table)
private TableSnapshot getCurrentSnapshot(ConnectorSession session, DeltaLakeTableHandle table)
{
return getSnapshot(session, table.getSchemaTableName(), table.getLocation(), Optional.of(table.getReadVersion()), Optional.empty());
return getCurrentSnapshot(session, table.getSchemaTableName(), table.getLocation(), Optional.of(table.getReadVersion()));
}

private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Map<String, Object> properties)
Expand Down Expand Up @@ -2471,7 +2492,7 @@ private void writeCheckpointIfNeeded(
try {
// We are writing checkpoint synchronously. It should not be long lasting operation for tables where transaction log is not humongous.
// Tables with really huge transaction logs would behave poorly in read flow already.
TableSnapshot snapshot = getSnapshot(session, table, tableLocation, Optional.of(readVersion), Optional.empty());
TableSnapshot snapshot = getCurrentSnapshot(session, table, tableLocation, Optional.of(readVersion));
long lastCheckpointVersion = snapshot.getLastCheckpointVersion().orElse(0L);
if (newVersion - lastCheckpointVersion < checkpointInterval.orElse(defaultCheckpointInterval)) {
return;
Expand Down Expand Up @@ -3300,7 +3321,7 @@ private void generateMissingFileStatistics(ConnectorSession session, DeltaLakeTa
Map<String, AddFileEntry> addFileEntriesWithNoStats;
try (Stream<AddFileEntry> activeFiles = transactionLogAccess.getActiveFiles(
session,
getSnapshot(session, tableHandle),
getCurrentSnapshot(session, tableHandle),
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
TupleDomain.all(),
Expand Down Expand Up @@ -3665,7 +3686,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl

private Stream<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
TableSnapshot tableSnapshot = getSnapshot(session, tableHandle);
TableSnapshot tableSnapshot = getCurrentSnapshot(session, tableHandle);
Stream<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(
session,
tableSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private Stream<DeltaLakeSplit> getSplits(
Constraint constraint)
{
TableSnapshot tableSnapshot = deltaLakeTransactionManager.get(transaction, session.getIdentity())
.getSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), Optional.of(tableHandle.getReadVersion()), Optional.empty());
.getCurrentSnapshot(session, tableHandle.getSchemaTableName(), tableHandle.getLocation(), Optional.of(tableHandle.getReadVersion()));
Stream<AddFileEntry> validDataFiles = transactionLogAccess.getActiveFiles(
session,
tableSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void doVacuum(
accessControl.checkCanInsertIntoTable(null, tableName);
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()), Optional.empty());
TableSnapshot tableSnapshot = metadata.getCurrentSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()));
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ private TableSnapshot loadSnapshotForTimeTravel(TrinoFileSystem fileSystem, Sche

private static Optional<LastCheckpoint> findCheckpoint(TrinoFileSystem fileSystem, String tableLocation, Long endVersion)
{
Optional<LastCheckpoint> lastCheckpoint = readLastCheckpoint(fileSystem, tableLocation);
if (lastCheckpoint.isPresent() && lastCheckpoint.get().getVersion() <= endVersion) {
return lastCheckpoint;
}

// TODO Make this logic efficient
Optional<LastCheckpoint> latestCheckpoint = Optional.empty();
Location transactionDirectory = Location.of(getTransactionLogDir(tableLocation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,31 @@ public void testChangeDataFileOperations()
@Test
public void testTimeTravelWithLastCheckpoint()
{
// Version 2 has a checkpoint file
registerTable("time_travel_with_last_checkpoint", "trino440/time_travel");
assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'time_travel_with_last_checkpoint')");

// Version 2 has a checkpoint file
assertFileSystemAccesses(
"SELECT * FROM time_travel_with_last_checkpoint FOR VERSION AS OF 1",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readExternal", "00000000000000000000.json", 0, 1015))
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000000.json", 0, 1015))
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 1015))
.add(new CacheOperation("Alluxio.readExternal", "00000000000000000001.json", 0, 613))
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000001.json", 0, 613))
.add(new CacheOperation("Alluxio.readCached", "00000000000000000001.json", 0, 613))
.addCopies(new CacheOperation("Alluxio.readExternal", "data", 0, 199), 2)
.addCopies(new CacheOperation("Alluxio.writeCache", "data", 0, 199), 2)
.addCopies(new CacheOperation("Alluxio.readCached", "data", 0, 199), 2)
.build());
assertFileSystemAccesses(
"SELECT * FROM time_travel_with_last_checkpoint FOR VERSION AS OF 2",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 4, 561))
.add(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 643, 767))
.addCopies(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 0, 5884), 2)
.addCopies(new CacheOperation("Alluxio.readExternal", "data", 0, 199), 3)
.addCopies(new CacheOperation("Alluxio.writeCache", "data", 0, 199), 3)
.add(new CacheOperation("Alluxio.readExternal", "data", 0, 199))
.add(new CacheOperation("Alluxio.writeCache", "data", 0, 199))
.addCopies(new CacheOperation("Alluxio.readCached", "data", 0, 199), 3)
.build());
assertFileSystemAccesses(
Expand All @@ -302,14 +315,23 @@ public void testTimeTravelWithLastCheckpoint()
public void testTimeTravelWithoutLastCheckpoint()
throws Exception
{
// Version 2 has a checkpoint file
Path tableLocation = Files.createTempFile("time_travel", null);
copyDirectoryContents(new File(Resources.getResource("trino440/time_travel").toURI()).toPath(), tableLocation);
Files.delete(tableLocation.resolve("_delta_log/_last_checkpoint"));

getQueryRunner().execute("CALL system.register_table(CURRENT_SCHEMA, 'time_travel_without_last_checkpoint', '" + tableLocation.toUri() + "')");
assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'time_travel_without_last_checkpoint')");

// Version 2 has a checkpoint file
assertFileSystemAccesses(
"SELECT * FROM time_travel_without_last_checkpoint FOR VERSION AS OF 1",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 1015))
.add(new CacheOperation("Alluxio.readCached", "00000000000000000001.json", 0, 613))
.addCopies(new CacheOperation("Alluxio.readExternal", "data", 0, 199), 2)
.addCopies(new CacheOperation("Alluxio.writeCache", "data", 0, 199), 2)
.addCopies(new CacheOperation("Alluxio.readCached", "data", 0, 199), 2)
.build());
assertFileSystemAccesses(
"SELECT * FROM time_travel_without_last_checkpoint FOR VERSION AS OF 2",
ImmutableMultiset.<CacheOperation>builder()
Expand All @@ -318,8 +340,8 @@ public void testTimeTravelWithoutLastCheckpoint()
.addCopies(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 0, 5884), 2)
.add(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 4, 561))
.add(new CacheOperation("Alluxio.readCached", "00000000000000000002.checkpoint.parquet", 643, 767))
.addCopies(new CacheOperation("Alluxio.readExternal", "data", 0, 199), 3)
.addCopies(new CacheOperation("Alluxio.writeCache", "data", 0, 199), 3)
.add(new CacheOperation("Alluxio.readExternal", "data", 0, 199))
.add(new CacheOperation("Alluxio.writeCache", "data", 0, 199))
.addCopies(new CacheOperation("Alluxio.readCached", "data", 0, 199), 3)
.build());
assertFileSystemAccesses(
Expand Down

0 comments on commit 4c0711a

Please sign in to comment.