Skip to content

Commit

Permalink
Retrieve snapshot id from temporal date or timestamp version
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath authored and ebyhr committed Sep 6, 2023
1 parent 6e62665 commit e61da2d
Show file tree
Hide file tree
Showing 18 changed files with 630 additions and 4 deletions.
18 changes: 18 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Expand Up @@ -1329,6 +1329,24 @@ SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna'
```

You can use a date to specify a point a time in the past for using a snapshot of a table in a query.
Assuming that the session time zone is `Europe/Vienna` the following queries are equivalent:

```
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF DATE '2022-03-23'
```

```
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00'
```

```
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00.000 Europe/Vienna'
```

##### Rolling back to a previous snapshot

Use the `$snapshots` metadata table to determine the latest snapshot ID of the
Expand Down
Expand Up @@ -100,7 +100,9 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.statistics.TableStatisticsMetadata;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.TypeManager;
import org.apache.datasketches.theta.CompactSketch;
Expand Down Expand Up @@ -152,6 +154,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -260,7 +265,10 @@
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.UuidType.UUID;
import static java.lang.Math.floorDiv;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -391,7 +399,7 @@ public ConnectorTableHandle getTableHandle(
Schema tableSchema;
Optional<PartitionSpec> partitionSpec;
if (endVersion.isPresent()) {
long snapshotId = getSnapshotIdFromVersion(table, endVersion.get());
long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get());
tableSnapshotId = Optional.of(snapshotId);
tableSchema = schemaFor(table, snapshotId);
partitionSpec = Optional.empty();
Expand Down Expand Up @@ -424,11 +432,11 @@ public ConnectorTableHandle getTableHandle(
Optional.empty());
}

private static long getSnapshotIdFromVersion(Table table, ConnectorTableVersion version)
private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version)
{
io.trino.spi.type.Type versionType = version.getVersionType();
return switch (version.getPointerType()) {
case TEMPORAL -> getTemporalSnapshotIdFromVersion(table, version, versionType);
case TEMPORAL -> getTemporalSnapshotIdFromVersion(session, table, version, versionType);
case TARGET_ID -> getTargetSnapshotIdFromVersion(table, version, versionType);
};
}
Expand All @@ -445,8 +453,28 @@ private static long getTargetSnapshotIdFromVersion(Table table, ConnectorTableVe
return snapshotId;
}

private static long getTemporalSnapshotIdFromVersion(Table table, ConnectorTableVersion version, io.trino.spi.type.Type versionType)
private static long getTemporalSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version, io.trino.spi.type.Type versionType)
{
if (versionType.equals(DATE)) {
// Retrieve the latest snapshot made before or at the beginning of the day of the specified date in the session's time zone
long epochMillis = LocalDate.ofEpochDay((Long) version.getVersion())
.atStartOfDay()
.atZone(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return getSnapshotIdAsOfTime(table, epochMillis);
}
if (versionType instanceof TimestampType timestampVersionType) {
long epochMicrosUtc = timestampVersionType.isShort()
? (long) version.getVersion()
: ((LongTimestamp) version.getVersion()).getEpochMicros();
long epochMillisUtc = floorDiv(epochMicrosUtc, MICROSECONDS_PER_MILLISECOND);
long epochMillis = LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillisUtc), ZoneOffset.UTC)
.atZone(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return getSnapshotIdAsOfTime(table, epochMillis);
}
if (versionType instanceof TimestampWithTimeZoneType timeZonedVersionType) {
long epochMillis = timeZonedVersionType.isShort()
? unpackMillisUtc((long) version.getVersion())
Expand Down
@@ -0,0 +1,142 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.spi.type.TimeZoneKey;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.containers.Minio;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergReadVersionedTableByTemporal
extends AbstractTestQueryFramework
{
private static final String BUCKET_NAME = "test-bucket-time-travel";

private Minio minio;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
minio = closeAfterClass(Minio.builder().build());
minio.start();
minio.createBucket(BUCKET_NAME);

QueryRunner queryRunner = IcebergQueryRunner.builder()
.setIcebergProperties(
ImmutableMap.<String, String>builder()
.put("hive.s3.aws-access-key", MINIO_ACCESS_KEY)
.put("hive.s3.aws-secret-key", MINIO_SECRET_KEY)
.put("hive.s3.endpoint", minio.getMinioAddress())
.put("hive.s3.path-style-access", "true")
.put("iceberg.register-table-procedure.enabled", "true")
.buildOrThrow())
.build();

queryRunner.execute("CREATE SCHEMA IF NOT EXISTS " + ICEBERG_CATALOG + ".tpch");
return queryRunner;
}

@AfterClass(alwaysRun = true)
public void destroy()
throws Exception
{
minio = null; // closed by closeAfterClass
}

@Test
public void testSelectTableWithEndVersionAsTemporal()
{
String tableName = "test_iceberg_read_versioned_table_" + randomNameSuffix();

minio.copyResources("iceberg/timetravel", BUCKET_NAME, "timetravel");
assertUpdate(format(
"CALL system.register_table('%s', '%s', '%s')",
getSession().getSchema().orElseThrow(),
tableName,
format("s3://%s/timetravel", BUCKET_NAME)));

assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES 1, 2, 3");

Session utcSession = Session.builder(getSession()).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
assertThat(query(utcSession, "SELECT made_current_at FROM \"" + tableName + "$history\""))
.matches("VALUES" +
" TIMESTAMP '2023-06-30 05:01:46.265 UTC'," + // CREATE TABLE timetravel(data integer)
" TIMESTAMP '2023-07-01 05:02:43.954 UTC'," + // INSERT INTO timetravel VALUES 1
" TIMESTAMP '2023-07-02 05:03:39.586 UTC'," + // INSERT INTO timetravel VALUES 2
" TIMESTAMP '2023-07-03 05:03:42.434 UTC'"); // INSERT INTO timetravel VALUES 3

assertUpdate("INSERT INTO " + tableName + " VALUES 4", 1);

assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3, 4");
Session viennaSession = Session.builder(getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey("Europe/Vienna")).build();
Session losAngelesSession = Session.builder(getSession()).setTimeZoneKey(TimeZoneKey.getTimeZoneKey("America/Los_Angeles")).build();

// version as date
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-01'"))
.returnsEmptyResult();
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-01'"))
.matches("VALUES 1");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-02'"))
.matches("VALUES 1");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-02'"))
.matches("VALUES 1, 2");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-03'"))
.matches("VALUES 1, 2");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-03'"))
.matches("VALUES 1, 2, 3");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-04'"))
.matches("VALUES 1, 2, 3");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF DATE '2023-07-04'"))
.matches("VALUES 1, 2, 3");

// version as timestamp
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-01 00:00:00'"))
.returnsEmptyResult();
assertThat(query(utcSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-01 05:02:43.953'"))
.returnsEmptyResult();
assertThat(query(utcSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-01 05:02:43.954'"))
.matches("VALUES 1");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-01 07:02:43.954'"))
.matches("VALUES 1");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-01 00:00:00.1'"))
.matches("VALUES 1");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-02 01:00:00.12'"))
.matches("VALUES 1");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-02 01:00:00.123'"))
.matches("VALUES 1, 2");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-03 02:00:00.123'"))
.matches("VALUES 1, 2");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-03 02:00:00.123456'"))
.matches("VALUES 1, 2, 3");
assertThat(query(viennaSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-04 03:00:00.123456789'"))
.matches("VALUES 1, 2, 3");
assertThat(query(losAngelesSession, "SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2023-07-04 03:00:00.123456789012'"))
.matches("VALUES 1, 2, 3");

assertUpdate("DROP TABLE " + tableName);
}
}
@@ -0,0 +1,20 @@
Data generated by actively changing the date/time settings of the host.

In `iceberg.properties` add the following properties:

```
iceberg.unique-table-location=false
iceberg.table-statistics-enabled=false
```

Use `trino` to create the table content

```sql
CREATE TABLE iceberg.tiny.timetravel(data integer);
-- increase the date on the host
INSERT INTO iceberg.tiny.timetravel VALUES 1;
-- increase the date on the host
INSERT INTO iceberg.tiny.timetravel VALUES 2;
-- increase the date on the host
INSERT INTO iceberg.tiny.timetravel VALUES 3;
```
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,64 @@
{
"format-version" : 2,
"table-uuid" : "bed033c8-301a-4f2b-b18a-dbe85348dc67",
"location" : "s3://test-bucket-time-travel/timetravel",
"last-sequence-number" : 1,
"last-updated-ms" : 1688101306265,
"last-column-id" : 1,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "data",
"required" : false,
"type" : "int"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"write.format.default" : "PARQUET"
},
"current-snapshot-id" : 3669526782178248824,
"refs" : {
"main" : {
"snapshot-id" : 3669526782178248824,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 3669526782178248824,
"timestamp-ms" : 1688101306265,
"summary" : {
"operation" : "append",
"trino_query_id" : "20230630_050145_00001_9ikf4",
"changed-partition-count" : "0",
"total-records" : "0",
"total-files-size" : "0",
"total-data-files" : "0",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://test-bucket-time-travel/timetravel/metadata/snap-3669526782178248824-1-cde47a04-b45e-4057-b872-aca0841fe99c.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1688101306265,
"snapshot-id" : 3669526782178248824
} ],
"metadata-log" : [ ]
}

0 comments on commit e61da2d

Please sign in to comment.