Skip to content

Integrate batched orphan files deletion with the existing schedule workflow#604

Draft
abhisheknath2011 wants to merge 4 commits into
linkedin:mainfrom
abhisheknath2011:batched-tables
Draft

Integrate batched orphan files deletion with the existing schedule workflow#604
abhisheknath2011 wants to merge 4 commits into
linkedin:mainfrom
abhisheknath2011:batched-tables

Conversation

@abhisheknath2011
Copy link
Copy Markdown
Member

@abhisheknath2011 abhisheknath2011 commented May 27, 2026

Summary

Wires the new BatchedOrphanFilesDeletionSparkApp (from #599) into the existing JobsScheduler flow so operators can amortize Spark startup over many tables today, without waiting for the optimizer-service stack to land. Batched OFD is opt-in via a new JobType — the existing single-table ORPHAN_FILES_DELETION path is untouched.

How it works

java JobsScheduler \
  --type ORPHAN_FILES_DELETION_BATCH \
  --tablesURL ... --jobsURL ... --cluster ... \
  --batchMaxItems 25
  1. JobsScheduler fetches all eligible tables (existing fetch path).
  2. New builder method groups by database (batches never cross databases) and runs FirstFitDecreasingBinPacker per group with an item-count cap from --batchMaxItems (default 25).
  3. Each bin becomes one BatchedTableOrphanFilesDeletionTask whose getArgs() returns ["--tableNames", "db.t1,db.t2,…"].
  4. The Jobs Service maps the new ORPHAN_FILES_DELETION_BATCH JobType to BatchedOrphanFilesDeletionSparkApp via jobs.yaml.
  5. The Spark app processes each table in a worker-thread pool, reusing Operations.deleteOrphanFiles(...) per table.

Key design choices

  • Additive, opt-in. Existing ORPHAN_FILES_DELETION JobType + OrphanFilesDeletionSparkApp keep working unchanged. Operators choose mode at scheduler launch via --type.
  • No optimizer service required. BatchedOrphanFilesDeletionSparkApp now treats --resultsEndpoint, --operationIds, --tableUuids as optional. When absent (the legacy scheduler path), the per-operation optimizer callback is skipped entirely; HTS StateManager still tracks the Spark job's lifecycle as before.
  • Batches never cross databases. Grouping happens before bin-packing, not after.
  • Item-count cap only for now. Weight (numCurrentFiles) and sizeBytes dimensions are disabled in this PR — the legacy scheduler doesn't have table_stats. Future PR can layer in Iceberg-snapshot-based sizing without touching the wire.
  • MAX_BATCH_SIZE = 200 hard ceiling. Footgun stop, not the operating point. At ~120 chars per entry (3 parallel CSVs of 36-char UUIDs in the optimizer path), 200 tables ≈ 24 KB on the command line — well under Linux ARG_MAX = 128 KB with
    headroom for the spark-submit envelope. Enforced at both the Spark app boundary (buildEntries) and the scheduler boundary (fail-fast on --batchMaxItems > 200).

Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Service contract (services/jobs)

  • JobConf.java — added ORPHAN_FILES_DELETION_BATCH to JobType. Codegen regenerates JobConf.JobTypeEnum in client/jobsclient automatically.

Scheduler (apps/spark)

  • util/TableMetadataBatch.java (new) — Metadata subclass: dbName + List. getEntityName() returns "db[N]" for metrics granularity.
  • scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java (new) — OperationTask; auto-discovered by JobsScheduler's reflection registry. jobName = "ORPHAN_FILES_DELETION_BATCH__".
  • scheduler/tasks/OperationTasksBuilder.java — new prepareBatchedOrphanFilesDeletionTaskList: filter eligible → groupBy(dbName) → FFD pack → emit one task per bin. Wired into both buildOperationTaskList (sync) and
    buildOperationTaskListInParallel (bulk-enqueue path because batching needs all metadata in hand first).
  • scheduler/JobsScheduler.java — --batchMaxItems CLI option threaded through getAdditionalProperties.

Spark app changes from #599

  • --resultsEndpoint, --operationIds, --tableUuids now optional. BatchEntry.operationId/tableUuid nullable. newOptimizerClient() returns null when endpoint is absent; reportResult() short-circuits.
  • MAX_BATCH_SIZE = 200 constant + guards.

Rollout plan

  • Phase 1 — merge this PR. ORPHAN_FILES_DELETION_BATCH is registered but no scheduler is launched with it. Zero behavior change for existing deployments.
  • Phase 2 — pick one cluster + a small database set, launch a parallel scheduler with --type ORPHAN_FILES_DELETION_BATCH --batchMaxItems 5. Compare success rate and total runtime against the single-table scheduler.
  • Phase 3 — raise --batchMaxItems (10 → 25 → 50) and broaden DB coverage. Drop the single-table scheduler for OFD once the batched one matches or beats it.
  • Rollback at any time: flip the scheduler back to --type ORPHAN_FILES_DELETION. Both paths coexist.

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

* <p>Construct with the {@link Config} builder to override the default timeouts.
*/
@Slf4j
public class OptimizerServiceClient implements AutoCloseable {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the previous PR #599

@abhisheknath2011 abhisheknath2011 changed the title Batched tables Integrate batched orphan files deletion with the existing schedule workflow May 27, 2026
.build();

Map<String, List<TableMetadata>> byDb =
eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group the tables by DBs.

*/
@Slf4j
@Getter
public class BatchedTableOrphanFilesDeletionTask extends OperationTask<TableMetadataBatch> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task implementation for batch job submission.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant