Skip to content

[server] Skip PubSub subscribe for batch-only partitions after EOP#2810

Open
sushantmane wants to merge 5 commits into
linkedin:mainfrom
sushantmane:sumane/batch-only-post-eop-terminal
Open

[server] Skip PubSub subscribe for batch-only partitions after EOP#2810
sushantmane wants to merge 5 commits into
linkedin:mainfrom
sushantmane:sumane/batch-only-post-eop-terminal

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

Problem Statement

A batch-only partition has no further data to ingest once END_OF_PUSH (EOP) has been received: the version's data set is complete. Today, replicas still hold an active PubSub consumer assignment for such partitions across their lifetime — through restarts and Helix state transitions — even though there is nothing more to consume.

Two consequences:

  1. Wasted resources. Each terminal batch-only replica keeps a partition assignment in a shared consumer pool. On hosts with many completed batch-only stores, this accumulates.
  2. Race-prone code path during the EOP leader-to-local switch. LeaderFollowerStoreIngestionTask.checkLongRunningTaskState at lines 779-808 runs unsubscribe(remote) -> wait -> setConsumeRemotely(false) -> consumerSubscribe(local). The intermediate wait on lastLeaderPersistFuture.get(1 min) can time out (TimeoutException is caught and "swallowed"); the flag flip then fires while in-flight leader-produced records are still queued at the drainer. The subsequent consumerSubscribe(local) only adds to the assignment churn. For batch-only stores there is no semantic reason to re-subscribe at all post-EOP.

Solution

Add PartitionConsumptionState.isBatchOnlyTerminal():

public boolean isBatchOnlyTerminal() {
  return isBatchOnly() && isEndOfPushReceived();
}

Short-circuit StoreIngestionTask.consumerSubscribe(PubSubTopicPartition, PubSubPosition, String) when the partition is terminal:

PartitionConsumptionState pcs = ... ;
if (pcs != null && pcs.isBatchOnlyTerminal()) {
  LOGGER.info("Skipping consumerSubscribe for batch-only terminal replica: {}", pcs.getReplicaId());
  return;
}

The guard lives at the base method, so every call site benefits without per-site changes:

  • Initial subscribe at SIT startup
  • preparePositionCheckpointAndStartConsumptionAsLeader (leader promote)
  • The EOP leader-to-local switch at LFSIT:804-808
  • onBecomeStandbyFromLeader re-subscribe path
  • Blob-transfer recovery resubscribe
  • Active-Active subclass override (delegates to super)

PCS / storage engine / metrics init are unaffected — they happen outside consumerSubscribe, so the replica still constructs its PCS, opens RocksDB in read-only mode, registers metrics, and goes ONLINE in Helix to serve reads.

Interaction with the offset-update race

Combined with PR for the offset-update race-fix (context-bound branch in updateOffsetsFromConsumerRecord), batch-only stores now have a clean post-EOP behavior:

Testing Done

  • Compiled with ./gradlew :clients:da-vinci-client:compileJava.
  • Existing PartitionConsumptionStateTest and StoreIngestionTaskTest continue to pass.
  • Unit tests for isBatchOnlyTerminal() and the subscribe short-circuit (covering the initial-subscribe and restart paths) will follow in a separate commit.

Out of scope

  • Drainer-side fast-path reportCompleted + unsubscribe at EOP processing time (today the polling defaultReadyToServeChecker triggers reportCompleted; can be moved in a follow-up).
  • The 1-minute timeout on lastLeaderPersistFuture.get in waitForAllMessageToBeProcessedFromTopicPartition. PR [server] Fix latestProcessedVtPosition corruption during EOP leader-to-local switch #2809 makes the timeout benign by closing the offset-update race; tightening the wait semantics is a separate change.

A batch-only partition has no further data to ingest once EOP has been
received. Holding an active PubSub consumer assignment for it is wasted work:
the replica serves reads from the read-only RocksDB partition and never needs
another record from the log.

Add `PartitionConsumptionState.isBatchOnlyTerminal()` (= batch-only AND EOP
received) and short-circuit `StoreIngestionTask.consumerSubscribe` for such
partitions. The actual `aggKafkaConsumerService.subscribeConsumerFor` call is
skipped; PCS / storage engine / metrics init are unaffected (they happen
outside this method, so the replica still goes ONLINE and serves reads).

Two concrete wins:

1. On restart for a completed batch-only replica, the SIT/Helix transition
path
   no longer holds an unnecessary consumer assignment. Reduces resource use on
   replicas serving stable batch-only data.
2. Closes a race in the EOP leader→local switch (LFSIT line 779-808). After
   `setConsumeRemotely(false)`, the subsequent
`consumerSubscribe(localKafkaServer)`
   becomes a no-op for terminal partitions, so no second active assignment is
   created. In-flight leader-produced records still get persisted via the
   drainer; their offset updates flow through the standard branch.

The guard is at the base `consumerSubscribe(PubSubTopicPartition, ...)`
method,
so every call site (initial subscribe, leader-promote, post-EOP local switch,
state-transition re-subscribe, blob-transfer recovery, AA subclass override
via super) benefits without per-site changes.

## Testing Done
- Compiled with `./gradlew :clients:da-vinci-client:compileJava`.
- Unit tests to follow in a separate commit.
Copilot AI review requested due to automatic review settings May 20, 2026 16:46
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes Da Vinci server ingestion for batch-only partitions by avoiding unnecessary PubSub consumer assignments after the replica has received END_OF_PUSH (EOP), reducing steady-state consumer pool usage and avoiding churn during post-EOP state transitions.

Changes:

  • Add PartitionConsumptionState.isBatchOnlyTerminal() to represent “batch-only + EOP received” terminal state.
  • Short-circuit StoreIngestionTask.consumerSubscribe(...) to skip PubSub subscribe when a partition is batch-only terminal (log + return).

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Skip PubSub subscribe for batch-only terminal partitions inside the base consumerSubscribe method.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java Introduce isBatchOnlyTerminal() helper (batch-only + EOP received).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +4826 to +4829
*/
if (pcs != null && pcs.isBatchOnlyTerminal()) {
LOGGER.info(
"Skipping consumerSubscribe for batch-only terminal replica: {} (EOP received, no further ingestion expected)",
Comment on lines +4821 to +4832
/*
* Batch-only partitions are terminal once EOP is received: no more records expected for the
* lifetime of this version. Skip the PubSub subscribe so the replica serves reads without
* holding an active consumer assignment. PCS / storage engine / metrics init are unaffected
* (they happen outside this method).
*/
if (pcs != null && pcs.isBatchOnlyTerminal()) {
LOGGER.info(
"Skipping consumerSubscribe for batch-only terminal replica: {} (EOP received, no further ingestion expected)",
pcs.getReplicaId());
return;
}
Comment on lines +808 to +816
/**
* A batch-only partition is terminal once EOP has been received: no more ingestion is expected
* for this partition for the remainder of the version's lifetime. Subscribers (Helix state
* transitions, restart init) can skip the PubSub subscribe step for such partitions; storage
* engine + metrics + PCS init still happen normally so the replica can serve reads.
*/
public boolean isBatchOnlyTerminal() {
return isBatchOnly() && isEndOfPushReceived();
}
…ipped

subscribe

The previous commit added a guard at consumerSubscribe that short-circuits for
batch-only partitions where EOP has been received: the PubSub subscribe is
skipped, but PCS / storage engine / metrics init still happen. The SIT run
loop
in StoreIngestionTask.run() periodically checks whether the consumer has any
subscriptions; if not, an idle counter ticks toward closing the SIT entirely
(maybeCloseInactiveIngestionTask).

For backup-version batch-only SITs, that close is fine — there's no quota
emission tied to backup versions. For the CURRENT version though, the SIT loop
emits host-level disk-quota usage every iteration via recordQuotaMetrics() ->
storageUtilizationManager.getDiskQuotaUsage(). Closing the SIT tears down the
PCS map and stops that emission; host-level quota metrics go stale.

Fix: extend the existing keep-alive branch in the idle-detection logic. When
the SIT is the current version AND any of its PCS entries is batch-only
terminal, take the same Thread.sleep(POST_UNSUB_SLEEP_MS) + resetIdleCounter()
path that the unsubscribe-after-batchpush flow uses. The SIT stays alive; the
periodic recordQuotaMetrics() keeps firing.

Add a `hasBatchOnlyTerminalPartition()` helper for the check.

## Testing Done

- [x] Compiled with `./gradlew :clients:da-vinci-client:compileJava`.
- [ ] Unit test simulating restart of a completed batch-only current version
      (PCS map populated with EOP=true PCS, consumerSubscribe was a no-op,
      idle counter > threshold) — verify SIT does NOT close and quota emission
      continues. To follow.
The previous commit introduced a `hasBatchOnlyTerminalPartition()` helper for
the in-SIT idle-close path.
KafkaStoreIngestionService.scanAndCloseIdleConsumptionTasks
already uses a broader predicate for the external scanner path —
`task.isCurrentVersion && task.hasReplicas()` — with the same intent:
don't tear down a current-version SIT that's responsible for live read-serving
and quota emission, regardless of why it lacks an active subscription.

Mirror that exact predicate in the in-SIT path instead. `hasReplicas()` covers
the batch-only-terminal case (replicas are bootstrapped → ac
tiveReplicaCount > 0
→ hasReplicas() = true) and any other current-version-with-replicas scenario.

This removes the just-added helper and aligns the two close-protection paths.

## Testing Done

- [x] Compiled with `./gradlew :clients:da-vinci-client:compileJava`.
…ch-only

terminal

The existing restart path (validateAndSubscribePartition ->
checkConsumptionStateWhenStart
-> defaultReadyToServeChecker.apply) already calls reportCompleted for
batch-only
partitions with EOP=true before reaching consumerSubscribe — verified via:
- checkFastReadyToServeForReplica returns false for non-hybrid (line 6364), so
  isCompletedReport=false at line 2584
- getDefaultReadyToServeChecker().apply(pcs) is invoked at line 2591
- isReadyToServe returns true at line 1278-1280 (isComplete() for batch-only
EOP)
- reportCompleted is invoked

So in the normal restart flow, completion is already guaranteed before the
skipped
subscribe.

For defense in depth, also call reportCompleted from inside the
consumerSubscribe
guard if the partition is terminal and completion hasn't yet been reported.
This
covers any future code path that might reach consumerSubscribe for a terminal
partition without going through the readyToServeChecker (e.g., a refactor that
introduces a new caller). reportCompleted is idempotent — gated by
pcs.isCompletionReported() at IngestionNotificationDispatcher.

## Testing Done

- [x] Compiled with `./gradlew :clients:da-vinci-client:compileJava`.
Copilot AI review requested due to automatic review settings May 20, 2026 17:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

Comment on lines +4839 to +4854
* Belt-and-suspenders: explicitly call reportCompleted before returning. The normal restart
* path (checkConsumptionStateWhenStart -> defaultReadyToServeChecker) already reports
* completion for batch-only EOP partitions before reaching here, and reportCompleted is
* idempotent (gated by pcs.isCompletionReported()). Calling it here guarantees that any
* code path which reaches consumerSubscribe for a terminal partition still ends with the
* replica reported COMPLETED — no possibility of leaving the replica stuck pre-COMPLETED.
*/
if (pcs != null && pcs.isBatchOnlyTerminal()) {
if (!pcs.isCompletionReported()) {
LOGGER.info(
"Reporting completion for batch-only terminal replica from consumerSubscribe path: {}",
pcs.getReplicaId());
reportCompleted(pcs);
}
LOGGER.info(
"Skipping consumerSubscribe for batch-only terminal replica: {} (EOP received, no further ingestion expected)",
Comment on lines +4846 to +4850
if (pcs != null && pcs.isBatchOnlyTerminal()) {
if (!pcs.isCompletionReported()) {
LOGGER.info(
"Reporting completion for batch-only terminal replica from consumerSubscribe path: {}",
pcs.getReplicaId());
Comment on lines +814 to +816
public boolean isBatchOnlyTerminal() {
return isBatchOnly() && isEndOfPushReceived();
}
Batch-only partitions with EOP already received now short-circuit in
consumerSubscribe without registering a PubSub subscription. There is no
matching batchUnsubscribe because the subscribe never happened.

Replace the timed expectation of `batchUnsubscribeConsumerFor` /
`batchUnsubscribe` with `never()` for both. `completed` is still asserted
(reported from the consumerSubscribe short-circuit). Quota emission
verify relaxed to `atLeastOnce` since the SIT no longer spins
post-completion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants