Skip to content

Commit

Permalink
Handle empty Iceberg tables while executing procedures
Browse files Browse the repository at this point in the history
If a table was just created it may not contain any snapshots.
Procedures run on tables that do not contain any snapshots can
safely do nothing.
  • Loading branch information
alexjo2144 authored and findepi committed Aug 11, 2022
1 parent 2815522 commit bfb1c63
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
tableHandle.getSchemaTableName(),
OPTIMIZE,
new IcebergOptimizeHandle(
tableHandle.getSnapshotId().orElseThrow(),
tableHandle.getSnapshotId(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), typeManager),
Expand Down Expand Up @@ -1071,8 +1071,8 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
newFiles.add(builder.build());
}

if (scannedDataFiles.isEmpty() && fullyAppliedDeleteFiles.isEmpty() && newFiles.isEmpty()) {
// Table scan turned out to be empty, nothing to commit
if (optimizeHandle.getSnapshotId().isEmpty() || scannedDataFiles.isEmpty() && fullyAppliedDeleteFiles.isEmpty() && newFiles.isEmpty()) {
// Either the table is empty, or the table scan turned out to be empty, nothing to commit
transaction = null;
return;
}
Expand All @@ -1089,7 +1089,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());
// Table.snapshot method returns null if there is no matching snapshot
Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId()), "snapshot is null");
Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId().get()), "snapshot is null");
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
rewriteFiles.commit();
transaction.commitTransaction();
Expand Down Expand Up @@ -1186,6 +1186,11 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu
IcebergConfig.REMOVE_ORPHAN_FILES_MIN_RETENTION,
IcebergSessionProperties.REMOVE_ORPHAN_FILES_MIN_RETENTION);

if (table.currentSnapshot() == null) {
log.debug("Skipping remove_orphan_files procedure for empty table " + table);
return;
}

long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
removeOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class IcebergOptimizeHandle
extends IcebergProcedureHandle
{
private final long snapshotId;
private final Optional<Long> snapshotId;
private final String schemaAsJson;
private final String partitionSpecAsJson;
private final List<IcebergColumnHandle> tableColumns;
Expand All @@ -41,7 +42,7 @@ public class IcebergOptimizeHandle

@JsonCreator
public IcebergOptimizeHandle(
long snapshotId,
Optional<Long> snapshotId,
String schemaAsJson,
String partitionSpecAsJson,
List<IcebergColumnHandle> tableColumns,
Expand All @@ -61,7 +62,7 @@ public IcebergOptimizeHandle(
}

@JsonProperty
public long getSnapshotId()
public Optional<Long> getSnapshotId()
{
return snapshotId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,28 @@ public void testOptimizeOnV2IcebergTable()
.containsOnly(row(1, 2), row(2, 2), row(3, 2), row(11, 12), row(12, 12), row(13, 12));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testAlterTableExecuteProceduresOnEmptyTable()
{
String baseTableName = "test_alter_table_execute_procedures_on_empty_table_" + randomTableSuffix();
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);

onSpark().executeQuery(format(
"CREATE TABLE %s (" +
" _string STRING" +
", _bigint BIGINT" +
", _integer INTEGER" +
") USING ICEBERG",
sparkTableName));

onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE optimize");
onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE expire_snapshots(retention_threshold => '7d')");
onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE remove_orphan_files(retention_threshold => '7d')");

assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).hasNoRows();
}

private static String escapeSparkString(String value)
{
return value.replace("\\", "\\\\").replace("'", "\\'");
Expand Down

0 comments on commit bfb1c63

Please sign in to comment.