Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)
.desc("Maximum compute cost budget in GB hours")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS)
.desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)")
.build());
options.addOption(
Option.builder(null)
.required(false)
Expand Down Expand Up @@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) {
OperationTasksBuilder.MAX_STRATEGIES_COUNT,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT));
}
if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) {
result.setProperty(
OperationTasksBuilder.BATCH_MAX_ITEMS,
cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS));
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code
* com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link
* JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType.
*
* <p>The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer
* service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code
* --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to
* HTS-only lifecycle tracking when they are absent.
*
* @see <a href="https://iceberg.apache.org/docs/latest/maintenance/#delete-orphan-files">Delete
* orphan files</a>
*/
@Slf4j
@Getter
public class BatchedTableOrphanFilesDeletionTask extends OperationTask<TableMetadataBatch> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task implementation for batch job submission.

public static final JobConf.JobTypeEnum OPERATION_TYPE =
JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH;

public BatchedTableOrphanFilesDeletionTask(
JobsClient jobsClient,
TablesClient tablesClient,
TableMetadataBatch metadata,
long pollIntervalMs,
long queuedTimeoutMs,
long taskTimeoutMs) {
super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs);
}

public BatchedTableOrphanFilesDeletionTask(
JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) {
super(jobsClient, tablesClient, metadata);
}

@Override
public JobConf.JobTypeEnum getType() {
return OPERATION_TYPE;
}

@Override
protected List<String> getArgs() {
String tableNames =
metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(","));
return Arrays.asList("--tableNames", tableNames);
}

@Override
protected boolean shouldRun() {
return !metadata.getTables().isEmpty();
}

@Override
protected boolean launchJob() {
String jobName =
String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size());
Map<String, String> executionProperties = Collections.emptyMap();
String proxyUser = metadata.getTables().get(0).getCreator();
jobId =
jobsClient
.launch(jobName, getType(), proxyUser, executionProperties, getArgs())
.orElse(null);
return jobId != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
import com.linkedin.openhouse.jobs.util.binpack.Bin;
import com.linkedin.openhouse.jobs.util.binpack.BinItem;
import com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
Expand All @@ -40,10 +46,12 @@
public class OperationTasksBuilder {
public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs";
public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount";
public static final String BATCH_MAX_ITEMS = "batchMaxItems";
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;
private static final int BATCH_MAX_ITEMS_DEFAULT = 25;
private static final String METRICS_SCOPE = JobsScheduler.class.getName();

private final OperationTaskFactory<? extends OperationTask<?>> taskFactory;
Expand All @@ -65,6 +73,80 @@ private List<OperationTask<?>> prepareTableOperationTaskList(
return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter);
}

/**
* Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible
* tables by database (batches never cross databases), then applies the first-fit-decreasing bin
* packer with a per-bin item cap from {@code properties} (defaults to {@value
* #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before
* grouping.
*/
private List<OperationTask<?>> prepareBatchedOrphanFilesDeletionTaskList(
JobConf.JobTypeEnum jobType,
Properties properties,
OperationMode operationMode,
OtelEmitter otelEmitter) {
int maxItemsPerBin =
NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT);
if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) {
throw new IllegalArgumentException(
String.format(
"--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d",
BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE));
}
List<TableMetadata> eligible =
tablesClient.getTableMetadataList().stream()
.filter(t -> !t.isMaintenanceJobDisabled(jobType))
.collect(Collectors.toList());
log.info(
"Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}",
eligible.size(),
maxItemsPerBin);

FirstFitDecreasingBinPacker packer =
FirstFitDecreasingBinPacker.builder()
.maxItemsPerBin(maxItemsPerBin)
// Item-count cap only; weight/size dimensions disabled until table_stats is wired in.
.maxWeightPerBin(0)
.maxSizeBytesPerBin(0)
.build();

Map<String, List<TableMetadata>> byDb =
eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group the tables by DBs.


List<TableMetadataBatch> batches = new ArrayList<>();
for (Map.Entry<String, List<TableMetadata>> dbGroup : byDb.entrySet()) {
String dbName = dbGroup.getKey();
List<BinItem> items =
dbGroup.getValue().stream()
.map(
t ->
BinItem.builder()
.fqtn(t.fqtn())
.operationId("")
.tableUuid("")
.databaseName(t.getDbName())
.tableName(t.getTableName())
.weight(1L)
.sizeBytes(0L)
.build())
.collect(Collectors.toList());
for (Bin bin : packer.pack(items)) {
List<TableMetadata> tablesForBin =
bin.items().stream()
.map(
item ->
dbGroup.getValue().stream()
.filter(t -> t.fqtn().equals(item.getFqtn()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("missing table for bin")))
.collect(Collectors.toList());
batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build());
}
}
log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size());
return processMetadataList(batches, jobType, operationMode, otelEmitter);
}

private List<OperationTask<?>> prepareReplicationOperationTaskList(
JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) {
List<TableMetadata> replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
Expand Down Expand Up @@ -272,6 +354,9 @@ public List<OperationTask<?>> buildOperationTaskList(
case DATA_LAYOUT_STRATEGY_GENERATION:
case SORT_STATS_COLLECTION:
return prepareTableOperationTaskList(jobType, operationMode, otelEmitter);
case ORPHAN_FILES_DELETION_BATCH:
return prepareBatchedOrphanFilesDeletionTaskList(
jobType, properties, operationMode, otelEmitter);
case REPLICATION:
return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter);
case DATA_LAYOUT_STRATEGY_EXECUTION:
Expand Down Expand Up @@ -300,6 +385,22 @@ public void buildOperationTaskListInParallel(
buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) {
buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) {
// Batched OFD needs the full table set in hand before it can group-by-db and bin-pack,
// so we use the synchronous fetch path then enqueue the tasks in bulk.
List<OperationTask<?>> tasks =
prepareBatchedOrphanFilesDeletionTaskList(
jobType, properties, operationMode, otelEmitter);
for (OperationTask<?> task : tasks) {
try {
operationTaskManager.addData(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while enqueueing batched OFD task", e);
}
}
operationTaskManager.updateDataGenerationCompletion();
log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType);
} else {
buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter);
}
Expand Down
Loading