[da-vinci][server] Global RT DIV: Fix Max Age Pruning#2720
Conversation
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…s absent in Global RT DIV When a leader transition occurs with Global RT DIV enabled, getLeaderPosition() previously returned EARLIEST if no LCRP (divRtCheckpointPosition) had been checkpointed yet — for example, on the first leader transition after the fe ature is enabled on a store. This caused the leader to re-consume the RT topic from the beginning instead of from the position it would have used before Global RT DIV. The fix adds a fallback in getLeaderPosition(): when useCheckpointedDivRtPosition is true but no LCRP is present, the method now falls through to getLatestProcessedRtPosition(), which is the position that leader transitions used before Global RT DIV was introduced. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ores isGlobalRtDivEnabled() now returns false when hybridStoreConfig is absent, even if the version-level flag is set. Global RT DIV consumes from an RT topic, so enabling it on a batch-only store has no meaning and could cause unexpected behavior in code paths gated by the flag. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
records In LeaderFollowerStoreIngestionTask.processMessageAndMaybeProduceToKafka(), when a write compute (UPDATE) record is a no-op (key does not exist, isSkipProduce() = true), the method returned early without calling produceToLocalKafka(). Since shouldSendGlobalRtDiv is only checked inside produceToLocalKafka(), the Global RT DIV checkpoint was never sent for such records. Write compute-only stores where all or most RT records are no-op updates would therefore never checkpoint their RT position, breaking follower recovery. Fix: before the early return for produce-skipped write compute records, check shouldSendGlobalRtDiv() and send a checkpoint if the byte threshold is exceeded. Refactor sendGlobalRtDivMessage() to construct LeaderMetadataWrapper internally from kafkaClusterId and previousMessage.getPosition(), removing the now-redundant LeaderMetadataWrapper and LeaderProducedRecordContext parameters. This makes the method callable from both the normal produce path and the produce-skip path without requiring caller-side wrapper construction. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes a Global RT DIV checkpointing gap in the leader/follower ingestion path for write-compute UPDATE records that are produce-skipped (no-op updates), ensuring followers can still recover RT position on leader failover.
Changes:
- Send Global RT DIV checkpoints even when write-compute UPDATEs are produce-skipped (no
produceToLocalKafka()call). - Refactor
sendGlobalRtDivMessageto takekafkaClusterIdand construct required wrappers internally. - Tighten Global RT DIV enablement and leader-position behavior with additional unit test coverage.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Sends Global RT DIV on produce-skipped WC UPDATEs; refactors Global RT DIV send helper signature. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Ensures Global RT DIV is only enabled for hybrid mode. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Adds LCRP fallback behavior to preserve pre-Global-RT-DIV subscribe semantics. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java | Updates signature-based test; adds test for produce-skipped WC UPDATE Global RT DIV checkpointing. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | Adds test asserting Global RT DIV requires a hybrid store (even if version flag is set). |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Adds unit test for getLeaderPosition LCRP fallback behavior. |
| .gitignore | Ignores .worktrees/. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
cloneVtProducerStates and cloneRtProducerStates Both clone methods were using System.currentTimeMillis() as the pruning anchor, causing all VT producer segments to be filtered out (size=0) during batch push reingestion when the push job ran longer ago than maxAgeInMs. This left localVtPosition stuck at EARLIEST on reingestion. Fix: thread latestMessageTimeInMs (from offsetRecord.calculateLatestMessageTimeInMs()) through all clone call sites, matching the data-relative anchor already used by clearExpiredStateAndUpdateOffsetRecord. On a fresh OffsetRecord, calculateLatestMessageTimeInMs() returns -1, making the pruning threshold a large negative — so no segments are pruned, which is the correct behavior fo r reingestion. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tests and extract helpers Fold testCloneVtProducerStatesDataRelativeAnchor and testCloneRtProducerStatesDataRelativeAnchor into the existing clone tests as a third assertion block each, eliminating two redundant test methods. Extract createDestTracker() helper in TestPartitionTracker to remove four copies of the same 3-line tracker construction. Extract buildTaskWithGlobalRtDiv() helper in StoreIngestionTaskTest to deduplicate factory/task construction in testIsGlobalRtDivEnabledRequiresHybridStore. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 10 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Merge conflicts resolved: - LeaderFollowerStoreIngestionTask.java: keep latestMessageTimeInMs arg, adopt upstream getConsumerDiv() getter fix, add EARLIEST early-return guard - PartitionConsumptionStateTest.java: keep testGetLeaderPositionLcrpFallback, add upstream HLL tests - TestPartitionTracker.java: add upstream testUpdateOffsetRecordPersistsLcvpWithNoSegments regression test, keep our detailed Javadoc for testCloneVtProducerStates PR review comments addressed: - Wrap shouldSendGlobalRtDiv + sendGlobalRtDivMessage in single try/catch for produce-skipped UPDATE path (matches normal produce path behavior) - Fix misleading Javadoc in PartitionTracker.cloneVtProducerStates: passing DISABLED as latestMessageTimeInMs does not skip pruning entirely; use maxAgeInMs=DISABLED for that - Update sendGlobalRtDivMessage Javadoc: previousMessage is not limited to messages produced to Kafka; also covers produce-skipped write compute records Fix LeaderFollowerStoreIngestionTaskTest mock stubs to use 3-param cloneVtProducerStates(anyInt(), anyBoolean(), anyLong()) signature Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
sendGlobalRtDivMessage calls pcs.getOffsetRecord().calculateLatestMessageTimeInMs() to pass the data-relative anchor to cloneVtProducerStates/cloneRtProducerStates. The test's mockPartitionConsumptionState had no stub for getOffsetRecord(), so it returned null and caused an NPE at line 3941. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The isHybridMode() check was overly conservative — Global RT DIV can apply to any store type. Remove the guard and the test that verified it. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
… records consumedBytesSinceLastSync is incremented in the consumer poll loop for ALL records (line 1497 of StoreIngestionTask), including no-op UPDATE records where isSkipProduce() is true. So when the next real record goes through produceToLocalKafkaHelper, shouldSendGlobalRtDiv sees the accumulated bytes from skipped records and triggers naturally — the explicit send on the early- return path is redundant. Also reverts the sendGlobalRtDivMessage Javadoc back to the original wording. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
processMessageAndMaybeProduceToKafka Both the normal produce path and the produce-skipped no-op UPDATE path now converge at the bottom of processMessageAndMaybeProduceToKafka, where the Global RT DIV check runs once, guarded by isRealTime(). This removes the duplicate check from produceToLocalKafkaHelper and makes the logic easier to reason about. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…method bottom Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
computeEarliestAllowableTimestamp - Add PartitionConsumptionState#getLatestMessageTimeInMs() to avoid repeating the pcs.getOffsetRecord().calculateLatestMessageTimeInMs() chain at 3 call sites - Extract PartitionTracker#computeEarliestAllowableTimestamp(long, long) to consolidate the identical DISABLED-ternary used in setPartitionState, cloneVtProducerStates, and cloneRtProducerStates Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
latestMessageTimeInMs param Align wording with cloneVtProducerStates: clarify that passing DISABLED as latestMessageTimeInMs produces a very large negative threshold (no segments pruned), and that maxAgeInMs=DISABLED is the correct knob to disable pruning entirely. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Problem Statement
Wall-clock anchor in cloneVtProducerStates / cloneRtProducerStates causes localVtPosition to remain EARLIEST on reingestion
Both clone methods used
System.currentTimeMillis() - maxAgeInMsas the pruning threshold. During batch push reingestion, VT producer segments have timestamps from when the push job originally ran (potentially hours or days ago). With a wall-clock anchor, all segments exceed the age threshold, are pruned, and the snapshot has size=0.When
updateOffsetRecorditerates over an empty snapshot,offsetRecord.setLatestConsumedVtPosition()is never called (it lives inside the for-loop), solocalVtPositionstays stuck atEARLIESTfor the entire reingestion. Additionally,cloneVtProducerStatesdestructively removes pruned entries from the liveconsumerDiv, so all subsequent syncs also see size=0.clearExpiredStateAndUpdateOffsetRecordalready handles this correctly by anchoring tooffsetRecord.calculateLatestMessageTimeInMs()(data-relative). On a fresh OffsetRecord at reingestion start, this returns -1, making the threshold a large negative, so no segments are pruned — effectively pinning from earliest time.Solution
Thread
latestMessageTimeInMs(fromoffsetRecord.calculateLatestMessageTimeInMs()) through allcloneVtProducerStatesandcloneRtProducerStatescall sites, replacing theSystem.currentTimeMillis()anchor — matchingclearExpiredStateAndUpdateOffsetRecord. All callers inLeaderFollowerStoreIngestionTaskhave PCS in scope viapcs.getLatestMessageTimeInMs();StoreIngestionTask.cloneDrainerDivProducerStatesfetches it the same way.Also refactors
sendGlobalRtDivMessage()to remove the caller-constructedLeaderMetadataWrapper/LeaderProducedRecordContextparameters (both were always derived from the same two values), replacing them withint kafkaClusterIdso the method constructs them internally.Code changes
Concurrency-Specific Checks
How was this PR tested?
Does this PR introduce any user-facing or breaking changes?