Skip to content

Commit

Permalink
HIVE-27454: Iceberg: Allow expire snapshots by id. (apache#4441). (Ay…
Browse files Browse the repository at this point in the history
…ush Saxena, reviewed by Denys Kuzmenko)
  • Loading branch information
ayushtkn authored and yeahyung committed Jul 20, 2023
1 parent d759037 commit c7419e9
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -119,6 +120,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.NullOrder;
Expand Down Expand Up @@ -752,20 +754,7 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", hmsTable.getCompleteName(),
numThreads);
final ExecutorService deleteExecutorService =
getDeleteExecutorService(hmsTable.getCompleteName(), numThreads);
try {
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis())
.executeDeleteWith(deleteExecutorService).commit();
} finally {
deleteExecutorService.shutdown();
}
} else {
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
}
expireSnapshot(icebergTable, expireSnapshotsSpec, numThreads);
break;
case SET_CURRENT_SNAPSHOT:
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
Expand All @@ -780,7 +769,51 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
}
}

private static ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
int numThreads) {
ExecutorService deleteExecutorService = null;
try {
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec.isExpireByIds()) {
expireSnapshotByIds(icebergTable, expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService);
} else {
expireSnapshotOlderThanTimestamp(icebergTable, expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
}
} finally {
if (deleteExecutorService != null) {
deleteExecutorService.shutdown();
}
}
}

private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long timestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots().expireOlderThan(timestamp);
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}

private void expireSnapshotByIds(Table icebergTable, String[] idsToExpire,
ExecutorService deleteExecutorService) {
if (idsToExpire.length != 0) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
for (String id : idsToExpire) {
expireSnapshots.expireSnapshotId(Long.parseLong(id));
}
LOG.info("Expiring snapshot on {} for snapshot Ids: {}", icebergTable.name(), Arrays.toString(idsToExpire));
if (deleteExecutorService != null) {
expireSnapshots = expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
}

private ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
return Executors.newFixedThreadPool(numThreads, runnable -> {
Thread thread = new Thread(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import org.apache.commons.collections4.IterableUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.Assert;
Expand All @@ -42,4 +43,26 @@ public void testExpireSnapshotsWithTimestamp() throws IOException, InterruptedEx
table.refresh();
Assert.assertEquals(2, table.history().size());
}

@Test
public void testExpireSnapshotsWithSnapshotId() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10);
Assert.assertEquals(10, IterableUtils.size(table.snapshots()));

// Expire one snapshot
shell.executeStatement(
"ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS" +
"('" + table.history().get(2).snapshotId() + "')");
table.refresh();
Assert.assertEquals(9, IterableUtils.size(table.snapshots()));
// Expire multiple snapshots
shell.executeStatement(
"ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS('" + table.history().get(3).snapshotId() + "," +
table.history().get(4).snapshotId() + "')");
table.refresh();
Assert.assertEquals(7, IterableUtils.size(table.snapshots()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.ddl.table.execute;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
Expand All @@ -41,6 +42,7 @@

import java.time.ZoneId;
import java.util.Map;
import java.util.regex.Pattern;

import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;
import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK;
Expand All @@ -54,6 +56,8 @@
@DDLType(types = HiveParser.TOK_ALTERTABLE_EXECUTE)
public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer {

private static final Pattern EXPIRE_SNAPSHOT_BY_ID_REGEX = Pattern.compile("\\d+(\\s*,\\s*\\d+)*");

public AlterTableExecuteAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
Expand Down Expand Up @@ -89,8 +93,13 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition

ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() : SessionState.get().getConf()
.getLocalTimeZone();
TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(child.getText()), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
String childText = PlanUtils.stripQuotes(child.getText().trim());
if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(childText).matches()) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(childText));
} else {
TimestampTZ time = TimestampTZUtil.parse(childText, timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
}
desc = new AlterTableExecuteDesc(tableName, partitionSpec, spec);
} else if (HiveParser.KW_SET_CURRENT_SNAPSHOT == executeCommandType.getType()) {
ASTNode child = (ASTNode) command.getChild(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.base.MoreObjects;

import java.util.Arrays;

/**
* Execute operation specification. It stores the type of the operation and its parameters.
* The following operations are supported
Expand Down Expand Up @@ -102,19 +104,38 @@ public String toString() {
* </ul>
*/
public static class ExpireSnapshotsSpec {
private final long timestampMillis;
private long timestampMillis = -1L;
private String[] idsToExpire = null;

public ExpireSnapshotsSpec(long timestampMillis) {
this.timestampMillis = timestampMillis;
}

public ExpireSnapshotsSpec(String ids) {
this.idsToExpire = ids.split(",");
}

public Long getTimestampMillis() {
return timestampMillis;
}

public String[] getIdsToExpire() {
return idsToExpire;
}

public boolean isExpireByIds() {
return idsToExpire != null;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("timestampMillis", timestampMillis).toString();
MoreObjects.ToStringHelper stringHelper = MoreObjects.toStringHelper(this);
if (isExpireByIds()) {
stringHelper.add("idsToExpire", Arrays.toString(idsToExpire));
} else {
stringHelper.add("timestampMillis", timestampMillis);
}
return stringHelper.toString();
}
}

Expand Down

0 comments on commit c7419e9

Please sign in to comment.