Skip to content

Commit

Permalink
fixup! Add support for time travel with version in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 16, 2024
1 parent 206346d commit 1f8c458
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ public TableSnapshot getSnapshot(
if (atVersion.isPresent()) {
long version = atVersion.get();
TableSnapshot snapshot = queriedSnapshots.get(new QueriedTable(table, version));
checkState(snapshot != null, "No previously loaded snapshot found for query %s, table %s [%s] at version %s", session.getQueryId(), table, tableLocation, version);
if (snapshot == null) { // This may happen when INSERT statement uses time travel in the source table
return transactionLogAccess.loadSnapshot(session, table, tableLocation, Optional.of(version));
}
return snapshot;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private TableSnapshot loadSnapshotForTimeTravel(TrinoFileSystem fileSystem, Sche

private static Optional<LastCheckpoint> findCheckpoint(TrinoFileSystem fileSystem, String tableLocation, Long endVersion)
{
// TODO Make this logic efficient
Optional<LastCheckpoint> latestCheckpoint = Optional.empty();
Location transactionDirectory = Location.of(getTransactionLogDir(tableLocation));
try {
Expand All @@ -239,7 +240,7 @@ private static Optional<LastCheckpoint> findCheckpoint(TrinoFileSystem fileSyste
}

long version = checkpoint.get().getVersion();
if (version > endVersion) {
if (version > endVersion || (latestCheckpoint.isPresent() && version < latestCheckpoint.get().getVersion())) {
continue;
}
latestCheckpoint = checkpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,40 +382,61 @@ public void testSelfJoin()
public void testSelectFromVersionedTable()
{
String tableName = "test_select_from_versioned_table";
assertUpdate("CREATE TABLE " + tableName + " (id int, age int)");
long v1SnapshotId = 0;
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20)", 1);
long v2SnapshotId = 1;
assertUpdate("INSERT INTO " + tableName + " VALUES (3, 30)", 1);
long v3SnapshotId = 2;
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId,
assertUpdate("CREATE TABLE " + tableName + "(id int)");
for (int i = 0; i < 25; i++) {
assertUpdate("INSERT INTO " + tableName + " VALUES " + i, 1);
}

assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 0",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.exists"))
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId,
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 1",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"))
.add(new FileOperation(DATA, "no partition", "InputFile.newInput"))
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId,
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 2",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.exists"))
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2)
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName,
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 8",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"))
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", "InputFile.exists"))
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 8)
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 13",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000010.checkpoint.parquet", "InputFile.length"), 2)
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000010.checkpoint.parquet", "InputFile.newInput"), 2)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000011.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000012.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000013.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000013.json", "InputFile.exists"))
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 13)
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF 20",
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000020.json", "InputFile.exists"))
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000020.checkpoint.parquet", "InputFile.length"), 2)
.addCopies(new FileOperation(CHECKPOINT, "00000000000000000020.checkpoint.parquet", "InputFile.newInput"), 2)
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 20)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ public void testLoadSnapshotWithEndVersion()
.add(new FileOperation("00000000000000000007.json", "InputFile.newStream"))
.add(new FileOperation("00000000000000000008.json", "InputFile.newStream"))
.add(new FileOperation("00000000000000000009.json", "InputFile.newStream"))
.addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"), 2)
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"))
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput"))
.build());

Expand All @@ -1002,7 +1002,7 @@ public void testLoadSnapshotWithEndVersion()
}
},
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"), 2)
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"))
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput"))
.build());

Expand All @@ -1015,7 +1015,7 @@ public void testLoadSnapshotWithEndVersion()
}
},
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"), 2)
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.length"))
.add(new FileOperation("00000000000000000010.checkpoint.parquet", "InputFile.newInput"))
.add(new FileOperation("00000000000000000011.json", "InputFile.newStream"))
.build());
Expand Down

0 comments on commit 1f8c458

Please sign in to comment.