Skip to content

Commit

Permalink
fixup! Add support for time travel with version in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 29, 2024
1 parent 7aedaed commit 72f2705
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class TransactionLogAccess
{
private static final Pattern CLASSIC_CHECKPOINT = Pattern.compile("(\\d*)\\.checkpoint\\.parquet");
private static final Pattern MULTI_PART_CHECKPOINT = Pattern.compile("(\\d*)\\.checkpoint\\.(\\d*)\\.(\\d*)\\.parquet");
private static final Pattern V2_CHECKPOINT = Pattern.compile("(\\d*)\\.checkpoint\\.[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\\.(json|parquet)");

private final TypeManager typeManager;
private final CheckpointSchemaManager checkpointSchemaManager;
Expand Down Expand Up @@ -272,6 +273,12 @@ private static Optional<LastCheckpoint> extractCheckpointVersion(FileEntry file)
return Optional.of(new LastCheckpoint(version, file.length(), Optional.of(parts), Optional.empty()));
}

Matcher v2Checkpoint = V2_CHECKPOINT.matcher(fileName);
if (v2Checkpoint.matches()) {
long version = Long.parseLong(v2Checkpoint.group(1));
return Optional.of(new LastCheckpoint(version, file.length(), Optional.empty(), Optional.of(new V2Checkpoint(fileName))));
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,37 @@ public void testTimeTravelWithMultipartCheckpoint()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testTimeTravelWithV2Checkpoint()
throws Exception
{
testTimeTravelWithV2Checkpoint("deltalake/v2_checkpoint_json");
testTimeTravelWithV2Checkpoint("deltalake/v2_checkpoint_parquet");
testTimeTravelWithV2Checkpoint("databricks133/v2_checkpoint_json");
testTimeTravelWithV2Checkpoint("databricks133/v2_checkpoint_parquet");
}

private void testTimeTravelWithV2Checkpoint(String resourceName)
throws Exception
{
String tableName = "test_time_travel_v2_checkpoint_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource(resourceName).toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

// Version 1 has v2 checkpoint
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName + " FOR VERSION AS OF 0");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1")).matches("VALUES (1, 2)");

// Redo the time travel without _last_checkpoint file
Files.delete(tableLocation.resolve("_delta_log/_last_checkpoint"));
assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName + " FOR VERSION AS OF 0");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1")).matches("VALUES (1, 2)");

assertUpdate("DROP TABLE " + tableName);
}

/**
* @see deltalake.partition_values_parsed
*/
Expand Down

0 comments on commit 72f2705

Please sign in to comment.