Skip to content

Commit

Permalink
fixup! Add support for time travel with version in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 3, 2024
1 parent fbacbf7 commit 7c1f565
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,28 +461,26 @@ public DeltaLakeMetadata(

public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table, String tableLocation, Optional<Long> atVersion)
{
if (atVersion.isEmpty()) {
atVersion = Optional.ofNullable(queriedVersions.get(new QueriedTable(table, Optional.empty())));
}
Optional<Long> version = atVersion.or(() -> Optional.ofNullable(queriedVersions.get(new QueriedTable(table, Optional.empty()))));

QueriedTable queriedTable = new QueriedTable(table, atVersion);
QueriedTable queriedTable = new QueriedTable(table, version);
if (queriedSnapshots.containsKey(queriedTable)) {
return queriedSnapshots.get(queriedTable);
}

try {
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, atVersion);
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, version);
// Lack of concurrency for given query is currently guaranteed by DeltaLakeMetadata
checkState(queriedVersions.put(queriedTable, snapshot.getVersion()) == null, "queriedVersions changed concurrently for %s", table);
queriedSnapshots.put(new QueriedTable(table, Optional.of(snapshot.getVersion())), snapshot);
return snapshot;
}
catch (IOException e) {
catch (IOException | RuntimeException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting snapshot for " + table, e);
}
}

private long getVersionNumberFromConnectorTableVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
private static long getVersionNumberFromConnectorTableVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
return switch (version.getPointerType()) {
// TODO https://github.com/trinodb/trino/issues/21024 Add support for reading tables with temporal versions
Expand Down Expand Up @@ -586,15 +584,8 @@ public LocatedTableHandle getTableHandle(
boolean managed = table.get().managed();

String tableLocation = table.get().location();
TableSnapshot tableSnapshot;
if (endVersion.isEmpty()) {
tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
}
else {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long version = getVersionNumberFromConnectorTableVersion(fileSystem, tableLocation, endVersion.get());
tableSnapshot = getSnapshot(session, tableName, tableLocation, Optional.of(version));
}
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersionNumberFromConnectorTableVersion(fileSystem, tableLocation, version)));

Map<Class<?>, Object> logEntries;
try {
Expand Down Expand Up @@ -644,7 +635,8 @@ public LocatedTableHandle getTableHandle(
Optional.empty(),
Optional.empty(),
Optional.empty(),
tableSnapshot.getVersion());
tableSnapshot.getVersion(),
endVersion.isPresent());
}

@Override
Expand Down Expand Up @@ -1905,7 +1897,7 @@ private long commitInsertOperation(
.map(DeltaLakeTableHandle.class::cast)
.filter(tableHandle -> handle.getTableName().equals(tableHandle.getSchemaTableName())
// disregard time travel table handles
&& tableHandle.getReadVersion() >= handle.getReadVersion())
&& !tableHandle.isTimeTravel())
.collect(toImmutableList());
long readVersionValue = readVersion.get();
if (currentVersion > readVersionValue) {
Expand Down Expand Up @@ -2917,7 +2909,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
false,
false,
Optional.empty(),
tableHandle.getReadVersion());
tableHandle.getReadVersion(),
tableHandle.isTimeTravel());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) &&
Expand Down Expand Up @@ -3213,7 +3206,8 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
Optional.empty(),
Optional.empty(),
Optional.of(analyzeHandle),
handle.getReadVersion());
handle.getReadVersion(),
handle.isTimeTravel());
TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata(
columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()),
analyzeColumnNames.orElse(allColumnNames),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum WriteType
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
private final long readVersion;
private final boolean timeTravel;

private final Optional<Set<DeltaLakeColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
Expand Down Expand Up @@ -84,7 +85,8 @@ public DeltaLakeTableHandle(
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
@JsonProperty("readVersion") long readVersion)
@JsonProperty("readVersion") long readVersion,
@JsonProperty("timeTravel") boolean timeTravel)
{
this(
schemaName,
Expand All @@ -104,7 +106,8 @@ public DeltaLakeTableHandle(
false,
false,
Optional.empty(),
readVersion);
readVersion,
timeTravel);
}

public DeltaLakeTableHandle(
Expand All @@ -125,7 +128,8 @@ public DeltaLakeTableHandle(
boolean recordScannedFiles,
boolean isOptimize,
Optional<DataSize> maxScannedFileSize,
long readVersion)
long readVersion,
boolean timeTravel)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -146,6 +150,7 @@ public DeltaLakeTableHandle(
this.isOptimize = isOptimize;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.timeTravel = timeTravel;
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
}

Expand All @@ -169,7 +174,8 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
recordScannedFiles,
isOptimize,
maxScannedFileSize,
readVersion);
readVersion,
timeTravel);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -192,7 +198,8 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
recordScannedFiles,
true,
Optional.of(maxScannedFileSize),
readVersion);
readVersion,
timeTravel);
}

@Override
Expand Down Expand Up @@ -327,6 +334,12 @@ public long getReadVersion()
return readVersion;
}

@JsonProperty
public boolean isTimeTravel()
{
return timeTravel;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -360,7 +373,8 @@ public boolean equals(Object o)
Objects.equals(analyzeHandle, that.analyzeHandle) &&
Objects.equals(isOptimize, that.isOptimize) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
readVersion == that.readVersion;
readVersion == that.readVersion &&
timeTravel == that.timeTravel;
}

@Override
Expand All @@ -383,6 +397,7 @@ public int hashCode()
recordScannedFiles,
isOptimize,
maxScannedFileSize,
readVersion);
readVersion,
timeTravel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,9 @@ private TableSnapshot loadSnapshotForTimeTravel(TrinoFileSystem fileSystem, Sche
throws IOException
{
try {
Optional<LastCheckpoint> lastCheckpoint = findCheckpoint(fileSystem, tableLocation, endVersion);
return TableSnapshot.load(
table,
lastCheckpoint,
findCheckpoint(fileSystem, tableLocation, endVersion),
fileSystem,
tableLocation,
parquetReaderOptions,
Expand All @@ -225,14 +224,14 @@ private TableSnapshot loadSnapshotForTimeTravel(TrinoFileSystem fileSystem, Sche
}
}

private static Optional<LastCheckpoint> findCheckpoint(TrinoFileSystem fileSystem, String tableLocation, Long endVersion)
private static Optional<LastCheckpoint> findCheckpoint(TrinoFileSystem fileSystem, String tableLocation, long endVersion)
{
Optional<LastCheckpoint> lastCheckpoint = readLastCheckpoint(fileSystem, tableLocation);
if (lastCheckpoint.isPresent() && lastCheckpoint.get().version() <= endVersion) {
return lastCheckpoint;
}

// TODO Make this logic efficient
// TODO https://github.com/trinodb/trino/issues/21366 Make this logic efficient
Optional<LastCheckpoint> latestCheckpoint = Optional.empty();
Location transactionDirectory = Location.of(getTransactionLogDir(tableLocation));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3974,6 +3974,16 @@ public void testInsertFromVersionedSameTable()

assertUpdate("INSERT INTO " + table.getName() + " SELECT * FROM " + table.getName() + " FOR VERSION AS OF 1", 2);
assertQuery("SELECT * FROM " + table.getName(), "VALUES 1, 2, 1, 2, 1");

assertQuery(
"SELECT version, operation, read_version, is_blind_append FROM \"" + table.getName() + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 0, true),
(1, 'WRITE', 0, true),
(2, 'WRITE', 1, true),
(3, 'WRITE', 2, true)
""");
}
}

Expand Down Expand Up @@ -4052,7 +4062,7 @@ public void testSelectTableUsingVersionDeletedCheckpoints()
@Test
public void testSelectAfterReadVersionedTable()
{
// Run normal SELECT after reading from versioned table considering cache
// Run normal SELECT after reading from versioned table
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_select_after_version", "AS SELECT 1 id")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES 2", 1);
assertQuery("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 0", "VALUES 1");
Expand Down Expand Up @@ -4089,12 +4099,6 @@ public void testReadVersionedSystemTables()
assertQueryFails("SELECT * FROM \"region$history\" FOR VERSION AS OF 0", "This connector does not support versioned tables");
}

@Test
public void testReadFromVersionedTableWithV2Checkpoint()
{
// TODO https://github.com/trinodb/trino/pull/19507 Add test for versioned tables with V2 checkpoint
}

@Override
protected void verifyVersionedQueryFailurePermissible(Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -247,6 +248,74 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned)
}
}

@Test
public void testConcurrentInsertsSelectingFromTheSameVersionedTable()
throws Exception
{
testConcurrentInsertsSelectingFromTheSameVersionedTable(true);
testConcurrentInsertsSelectingFromTheSameVersionedTable(false);
}

private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean partitioned)
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_inserts_select_from_same_versioned_table_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (0, 10)", 1);

try {
// Considering T1, T2, T3 being the order of completion of the concurrent INSERT operations,
// if all the operations would eventually succeed, the entries inserted per thread would look like this:
// T1: (0, 10)
// T2: (1, 10)
// T3: (2, 10)
List<Future<OptionalInt>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("INSERT INTO " + tableName + " SELECT " + threadNumber + ", 10 AS part FROM " + tableName + " FOR VERSION AS OF 0");
return OptionalInt.of(threadNumber);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
assertThat(trinoException).hasMessage("Failed to write Delta Lake transaction log entry");
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return OptionalInt.empty();
}
}))
.collect(toImmutableList());

List<Integer> values = futures.stream()
.map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(OptionalInt::isPresent)
.map(OptionalInt::getAsInt)
.collect(toImmutableList());

assertThat(values).hasSizeGreaterThanOrEqualTo(1);
assertQuery(
"SELECT * FROM " + tableName,
"VALUES (0, 10)" +
values.stream()
.map("(%s, 10)"::formatted)
.collect(joining(", ", ", ", "")));
}
finally {
assertUpdate("DROP TABLE " + tableName);
executor.shutdownNow();
assertTrue(executor.awaitTermination(10, SECONDS));
}
}

@Test
public void testConcurrentInsertsSelectingFromTheSamePartition()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<DeltaLakeColu
Optional.of(ImmutableList.of(BOOLEAN_COLUMN_HANDLE)),
Optional.of(ImmutableList.of(DOUBLE_COLUMN_HANDLE)),
Optional.empty(),
0);
0,
false);
}

private static TupleDomain<DeltaLakeColumnHandle> createConstrainedColumnsTuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void testDynamicSplitPruningOnUnpartitionedTable()
Optional.empty(),
Optional.empty(),
Optional.empty(),
0),
0,
false),
transaction);

TupleDomain<ColumnHandle> splitPruningPredicate = TupleDomain.withColumnDomains(
Expand Down Expand Up @@ -246,7 +247,8 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter()
Optional.empty(),
Optional.empty(),
Optional.empty(),
0),
0,
false),
transaction);

// Simulate situations where the dynamic filter (e.g.: while performing a JOIN with another table) reduces considerably
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public class TestDeltaLakeSplitManager
Optional.empty(),
Optional.empty(),
Optional.empty(),
0);
0,
false);
private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true);

@Test
Expand Down

0 comments on commit 7c1f565

Please sign in to comment.