diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 67257daa5dde..805443947384 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -912,6 +912,40 @@ procedure on the catalog's ``system`` schema:: ``unregister_table`` only when using the Hive catalog. This is similar to the behavior listed above for the ``DROP TABLE`` command. +Expire snapshots +^^^^^^^^^^^^^^^^ + +Each DML (Data Manipulation Language) action in Iceberg produces a new snapshot while keeping the old data and metadata for snapshot isolation and time travel. Use `expire_snapshots` to remove older snapshots and their files. + +This procedure removes old snapshots and their corresponding files, and never removes files which are required by a non-expired snapshot. + +The following arguments are available: + +===================== ========== =============== ======================================================================= +Argument Name required type Description +===================== ========== =============== ======================================================================= +``schema`` ✔️ string Schema of the table to update + +``table_name`` ✔️ string Name of the table to update + +``older_than`` timestamp Timestamp before which snapshots will be removed (Default: 5 days ago) + +``retain_last`` int Number of ancestor snapshots to preserve regardless of older_than + (defaults to 1) + +``snapshot_ids`` array of long Array of snapshot IDs to expire +===================== ========== =============== ======================================================================= + +Examples: + +* Remove snapshots older than a specific day and time, but retain the last 10 snapshots:: + + CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10); + +* Remove snapshots with snapshot ID 10001 and 10002 (note that these snapshot IDs should not be the current snapshot):: + + CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]); + Schema Evolution ----------------- diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 8c7cd0545c14..185249c305b2 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -39,6 +39,7 @@ import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; import com.facebook.presto.iceberg.nessie.NessieConfig; import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider; +import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure; import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure; @@ -149,6 +150,7 @@ public void setup(Binder binder) procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON); // for orc binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ExpireSnapshotsProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ExpireSnapshotsProcedure.java new file mode 100644 index 000000000000..0487f3c2b610 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/ExpireSnapshotsProcedure.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.common.type.SqlTimestamp; +import com.facebook.presto.iceberg.IcebergAbstractMetadata; +import com.facebook.presto.iceberg.IcebergMetadataFactory; +import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.Procedure; +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.lang.invoke.MethodHandle; +import java.util.List; + +import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; +import static com.facebook.presto.common.type.StandardTypes.INTEGER; +import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP; +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class ExpireSnapshotsProcedure + implements Provider +{ + private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle( + ExpireSnapshotsProcedure.class, + "expireSnapshots", + ConnectorSession.class, + String.class, + String.class, + SqlTimestamp.class, + Integer.class, + List.class); + private final IcebergMetadataFactory metadataFactory; + + @Inject + public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory) + { + this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "expire_snapshots", + ImmutableList.of( + new Procedure.Argument("schema", VARCHAR), + new Procedure.Argument("table_name", VARCHAR), + new Procedure.Argument("older_than", TIMESTAMP, false, null), + new Procedure.Argument("retain_last", INTEGER, false, null), + new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)), + EXPIRE_SNAPSHOTS.bindTo(this)); + } + + public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List snapshotIds) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds); + } + } + + private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List snapshotIds) + { + IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create(); + SchemaTableName schemaTableName = new SchemaTableName(schema, tableName); + Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName); + + ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); + + if (snapshotIds != null) { + for (long id : snapshotIds) { + expireSnapshots = expireSnapshots.expireSnapshotId(id); + } + } + + if (olderThan != null) { + expireSnapshots = expireSnapshots.expireOlderThan(olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis()); + } + + if (retainLast != null) { + expireSnapshots = expireSnapshots.retainLast(retainLast); + } + + expireSnapshots.cleanExpiredFiles(true) + .commit(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestExpireSnapshotProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestExpireSnapshotProcedure.java new file mode 100644 index 000000000000..7f504a9b7f09 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestExpireSnapshotProcedure.java @@ -0,0 +1,289 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.Session; +import com.facebook.presto.Session.SessionBuilder; +import com.facebook.presto.common.type.TimeZoneKey; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +public class TestExpireSnapshotProcedure + extends AbstractTestQueryFramework +{ + public static final String ICEBERG_CATALOG = "test_hive"; + public static final String TEST_SCHEMA = "tpch"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, ImmutableMap.of()); + } + + public void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS iceberg." + TEST_SCHEMA + "." + tableName); + } + + @DataProvider(name = "timezones") + public Object[][] timezones() + { + return new Object[][] { + {"UTC", true}, + {"America/Los_Angeles", true}, + {"Asia/Shanghai", true}, + {"UTC", false}}; + } + + @Test + public void testExpireSnapshotsInEmptyTable() + { + String tableName = "default_empty_table"; + assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)"); + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 0); + + assertUpdate(format("CALL system.expire_snapshots('%s', '%s')", TEST_SCHEMA, tableName)); + assertUpdate(format("CALL system.expire_snapshots('%s', '%s', %s)", TEST_SCHEMA, tableName, "timestamp '1984-12-08 00:10:00.000'")); + assertUpdate(format("CALL system.expire_snapshots('%s', '%s', %s, %s)", TEST_SCHEMA, tableName, "timestamp '1984-12-08 00:10:00.000'", 5)); + assertUpdate(format("CALL system.expire_snapshots('%s', '%s', %s, %s, %s)", TEST_SCHEMA, tableName, "timestamp '1984-12-08 00:10:00.000'", 5, "ARRAY[12345]")); + table.refresh(); + assertHasSize(table.snapshots(), 0); + + assertUpdate(format("CALL system.expire_snapshots(schema => '%s', table_name => '%s')", TEST_SCHEMA, tableName)); + assertUpdate(format("CALL system.expire_snapshots(schema => '%s', table_name => '%s', older_than => %s)", TEST_SCHEMA, tableName, "timestamp '1984-12-08 00:10:00.000'")); + assertUpdate(format("CALL system.expire_snapshots(schema => '%s', table_name => '%s', retain_last => %s)", TEST_SCHEMA, tableName, 5)); + assertUpdate(format("CALL system.expire_snapshots(schema => '%s', table_name => '%s', snapshot_ids => %s)", TEST_SCHEMA, tableName, "ARRAY[12345]")); + table.refresh(); + assertHasSize(table.snapshots(), 0); + + dropTable(tableName); + } + + @Test(dataProvider = "timezones") + public void testExpireSnapshotsUsingPositionalArgs(String zoneId, boolean legacyTimestamp) + { + Session session = sessionForTimezone(zoneId, legacyTimestamp); + String tableName = "positional_args_table"; + try { + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, value varchar)"); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(1, 'a')", 1); + + Table table = loadTable(tableName); + Snapshot firstSnapshot = table.currentSnapshot(); + + waitUntilAfter(firstSnapshot.timestampMillis()); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(2, 'b')", 1); + table.refresh(); + + Snapshot secondSnapshot = table.currentSnapshot(); + String formattedDateTime = getTimestampString(secondSnapshot.timestampMillis(), zoneId); + + assertHasSize(table.snapshots(), 2); + + // expire without retainLast param + String callStr = format("CALL system.expire_snapshots('%s', '%s', TIMESTAMP '%s')", TEST_SCHEMA, tableName, formattedDateTime); + assertUpdate(session, callStr); + + table.refresh(); + assertHasSize(table.snapshots(), 1); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(3, 'c')", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(4, 'd')", 1); + assertQuery(session, "select * from " + tableName, "values(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')"); + + table.refresh(); + + waitUntilAfter(table.currentSnapshot().timestampMillis()); + + String currentTimestamp = getTimestampString(System.currentTimeMillis(), zoneId); + + assertHasSize(table.snapshots(), 3); + + // expire with retainLast param + assertUpdate(session, format("CALL system.expire_snapshots('%s', '%s', TIMESTAMP '%s', %s)", TEST_SCHEMA, tableName, currentTimestamp, 2)); + + table.refresh(); + assertHasSize(table.snapshots(), 2); + } + finally { + dropTable(tableName); + } + } + + @Test(dataProvider = "timezones") + public void testExpireSnapshotUsingNamedArgsOfOlderThanAndRetainLast(String zoneId, boolean legacyTimestamp) + { + Session session = sessionForTimezone(zoneId, legacyTimestamp); + String tableName = "named_args_table"; + try { + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, data varchar)"); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(1, 'a')", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(2, 'b')", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(3, 'c')", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(4, 'd')", 1); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 4); + + // By default, we would only drop the snapshots 5 days again. So there isn't any snapshot dropped here. + assertUpdate(session, format("CALL system.expire_snapshots(retain_last => %s, table_name => '%s', schema => '%s')", + 2, tableName, TEST_SCHEMA)); + table.refresh(); + assertHasSize(table.snapshots(), 4); + + waitUntilAfter(table.currentSnapshot().timestampMillis()); + String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId); + + // Indicate to retain the last 2 snapshots regardless of `older_than` + assertUpdate(session, format("CALL system.expire_snapshots(retain_last => %s, older_than => TIMESTAMP '%s', table_name => '%s', schema => '%s')", + 2, formattedDateTime, tableName, TEST_SCHEMA)); + table.refresh(); + assertHasSize(table.snapshots(), 2); + + // By default, we would retain the last 1 snapshot + assertUpdate(session, format("CALL system.expire_snapshots(older_than => TIMESTAMP '%s', table_name => '%s', schema => '%s')", + formattedDateTime, tableName, TEST_SCHEMA)); + table.refresh(); + assertHasSize(table.snapshots(), 1); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testExpireSnapshotUsingNamedArgsOfSnapshotIds() + { + Session session = getSession(); + String tableName = "named_args_snapshot_ids_table"; + try { + assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, data varchar)"); + + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(1, 'a')", 1); + Table table = loadTable(tableName); + long snapshotId1 = table.currentSnapshot().snapshotId(); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(2, 'b')", 1); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(3, 'c')", 1); + table.refresh(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + assertUpdate(session, "INSERT INTO " + tableName + " VALUES(4, 'd')", 1); + + table.refresh(); + assertHasSize(table.snapshots(), 4); + + // Drop the indicated 2 snapshots + assertUpdate(session, format("CALL system.expire_snapshots(table_name => '%s', schema => '%s', snapshot_ids => %s)", + tableName, TEST_SCHEMA, "array[" + snapshotId1 + ", " + snapshotId2 + "]")); + table.refresh(); + assertHasSize(table.snapshots(), 2); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testInvalidExpireSnapshotsCases() + { + assertQueryFails("CALL system.expire_snapshots('n', table_name => 't')", ".*Named and positional arguments cannot be mixed"); + assertQueryFails("CALL custom.expire_snapshots('n', 't')", "Procedure not registered: custom.expire_snapshots"); + assertQueryFails("CALL system.expire_snapshots()", ".*Required procedure argument 'schema' is missing"); + assertQueryFails("CALL system.expire_snapshots('s', 'n', 2.2)", ".*Cannot cast type decimal\\(2,1\\) to timestamp"); + assertQueryFails("CALL system.expire_snapshots('', '')", "schemaName is empty"); + } + + private String getTimestampString(long timeMillsUtc, String zoneId) + { + Instant instant = Instant.ofEpochMilli(timeMillsUtc); + LocalDateTime localDateTime = instant + .atZone(ZoneId.of(zoneId)) + .toLocalDateTime(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + formatter = formatter.withZone(ZoneId.of(zoneId)); + return localDateTime.format(formatter); + } + + private Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } + + private File getCatalogDirectory() + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + return dataDirectory.toFile(); + } + + private long waitUntilAfter(long timestampMillis) + { + long current = System.currentTimeMillis(); + while (current <= timestampMillis) { + current = System.currentTimeMillis(); + } + return current; + } + + private void assertHasSize(Iterable iterable, int size) + { + AtomicInteger count = new AtomicInteger(0); + iterable.forEach(obj -> count.incrementAndGet()); + assertEquals(count.get(), size); + } + + private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) + { + SessionBuilder sessionBuilder = Session.builder(getSession()) + .setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp)); + if (legacyTimestamp) { + sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId)); + } + return sessionBuilder.build(); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java b/presto-main/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java index 94f7042eeeb0..52f63cf7ff6d 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/ProcedureRegistry.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.metadata; +import com.facebook.presto.common.type.SqlTimestamp; +import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.spi.ConnectorId; @@ -33,6 +35,7 @@ import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.StandardTypes.ARRAY; import static com.facebook.presto.common.type.StandardTypes.MAP; import static com.facebook.presto.common.type.VarcharType.VARCHAR; @@ -111,6 +114,9 @@ private static Class getObjectType(Type type) if (type.equals(BOOLEAN)) { return boolean.class; } + if (type.equals(INTEGER)) { + return int.class; + } if (type.equals(BIGINT)) { return long.class; } @@ -120,6 +126,9 @@ private static Class getObjectType(Type type) if (type.equals(VARCHAR)) { return String.class; } + if (type instanceof TimestampType) { + return SqlTimestamp.class; + } if (type.getTypeSignature().getBase().equals(ARRAY)) { getObjectType(type.getTypeParameters().get(0)); return List.class;