Skip to content

[server] Fix latestProcessedVtPosition corruption during EOP leader-to-local switch#2809

Open
sushantmane wants to merge 1 commit into
linkedin:mainfrom
sushantmane:sumane/fix-vt-position-race
Open

[server] Fix latestProcessedVtPosition corruption during EOP leader-to-local switch#2809
sushantmane wants to merge 1 commit into
linkedin:mainfrom
sushantmane:sumane/fix-vt-position-race

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

Problem Statement

LeaderFollowerStoreIngestionTask.updateOffsetsFromConsumerRecord branches on shouldProduceToVersionTopic(pcs), which reads pcs.consumeRemotely() live. The flag can flip false (via setConsumeRemotely(false) in the EOP leader-to-local switch at checkLongRunningTaskState) between the leader's produce callback and the drainer's processing of the same record. When that happens, in-flight leader-produced records that were consumed from an upstream topic fall through to the local branch and write consumerRecord.getPosition() (the upstream-broker offset, with a different topicId) into latestProcessedVtPosition. The local-VT slot ends up holding a position that belongs to a different physical shard than the local VT.

Downstream effects:

  • TopicManager.getIngestionProgressPercentage -> TopicMetadataFetcher.diffPosition -> XcConsumerAdapter.positionDifference fails with PositionCompareOrDiffException: cannot compare position of different shards (PositionUtils.validateSameShard).
  • A subsequent re-subscribe on the local broker carries an upstream-broker topicId, triggering topic-id-mismatch fallbacks in downstream adapters.

Solution

Dispatch on the record's own leaderProducedRecordContext instead of on the live consumeRemotely flag:

  • context != null -> record was produced to local VT by the leader after consuming from upstream. Its local-VT position is producedPosition (returned by the local producer callback). Goes through updateOffsetsAsRemoteConsumeLeader, which already handles the chunk sub-case (consumedPosition=EARLIEST) as a no-op via hasCorrespondingUpstreamMessage.
  • context == null -> record was consumed by a follower or by a leader consuming local VT directly. Its local-VT position is consumerRecord.getPosition(). Same body as before.

The offset-update decision is now per-record, eliminating the race across the leader-to-local transition (and any other path where the flag and the in-flight record diverge).

Why this is safe across store types

  • Follower: leaderProducedRecordContext is always null (drainer-queue insert at StoreIngestionTask.java:1473-1479 passes null). Falls through to the local branch. No change.
  • Leader consuming local VT (post-EOP non-NR or non-NR setups): same as follower, context is null. No change.
  • Leader-NR (consuming upstream, producing local): LeaderProducerCallback queues with a populated context. Goes through updateOffsetsAsRemoteConsumeLeader. No change in steady state; the race-prone window during the EOP transition now correctly uses producedPosition.
  • Active-Active: uses the same updateOffsetsFromConsumerRecord via inheritance. Same dispatch.
  • Leader-generated chunks: context is non-null but hasCorrespondingUpstreamMessage=false. updateOffsetsAsRemoteConsumeLeader skips them today via the internal check, unchanged.

Testing Done

  • Compiled with ./gradlew :clients:da-vinci-client:compileJava.
  • Existing LeaderFollowerStoreIngestionTaskTest continues to pass.
  • Targeted unit test for the dispatch (covering both the null-context follower path and the non-null-context leader-produce path while shouldProduceToVersionTopic returns false) will follow in a separate commit.

consumeRemotely flag

updateOffsetsFromConsumerRecord branches on `shouldProduceToVersionTopic`,
which
reads `pcs.consumeRemotely()` at call time. The flag can flip false (during
the
EOP leader→local switch in checkLongRunningTaskState) between the leader's
produce callback and the drainer's processing of the same record. When that
happens, in-flight leader-produced records that were consumed from an upstream
topic fall through to the local branch and write
`consumerRecord.getPosition()`
(the upstream-broker offset, with a different topicId) into
`latestProcessedVtPosition`. The local-VT slot ends up holding a position that
belongs to a different physical shard than the local VT, breaking later
position-diff operations (e.g., getIngestionProgressPercentage) with
PositionCompareOrDiffException: "cannot compare position of different shards".

Switch the dispatch to use the record's `leaderProducedRecordContext`. A
non-null
context means the record was leader-produced from an upstream consume; its
local-VT position is `producedPosition` (the local-broker offset), regardless
of
the current flag state. updateOffsetsAsRemoteConsumeLeader already handles the
chunk sub-case (consumedPosition=EARLIEST) as a no-op.

This makes the offset-update decision per-record and race-free across the
EOP-to-local transition.

## Testing Done
- Compiled with `./gradlew :clients:da-vinci-client:compileJava`.
- Unit test to follow in a separate commit (capturing dispatch with both
  null-context follower path and non-null-context leader-produce path while
  shouldProduceToVersionTopic returns false).
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a race in LeaderFollowerStoreIngestionTask.updateOffsetsFromConsumerRecord where an EOP leader-to-local switch can cause the task to write an upstream-broker PubSubPosition into the local version-topic (latestProcessedVtPosition), corrupting ingestion progress tracking and triggering shard/topicId mismatch errors downstream.

Changes:

  • Switch offset-update dispatch from the live consumeRemotely/shouldProduceToVersionTopic flag to the per-record leaderProducedRecordContext provenance.
  • Ensure leader-produced records update local VT progress using producedPosition (local broker) rather than consumerRecord.getPosition() (upstream/source broker).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

* updateOffsetsAsRemoteConsumeLeader already handles the chunk-record sub-case
* (consumedPosition=EARLIEST) as a no-op via its hasCorrespondingUpstreamMessage check.
*/
if (leaderProducedRecordContext == null) {
Comment on lines +1886 to +1902
/*
* Dispatch on the record's own provenance, not on the live consumeRemotely flag.
*
* A non-null leaderProducedRecordContext means this record was produced to local VT by the
* leader after consuming it from upstream. Its local-VT position is producedPosition (the
* local-broker offset returned by the producer callback), not consumerRecord.getPosition()
* (the offset on the upstream/source topic).
*
* Branching on shouldProduceToVersionTopic (which reads pcs.consumeRemotely() live) is racy:
* if the flag flips false between the leader's produce and the drainer's processing of the
* same record, the local branch below would capture consumerRecord.getPosition() (the
* upstream offset on a different broker, with a different topicId) into
* latestProcessedVtPosition.
*
* updateOffsetsAsRemoteConsumeLeader already handles the chunk-record sub-case
* (consumedPosition=EARLIEST) as a no-op via its hasCorrespondingUpstreamMessage check.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*
* Dispatch on the record's own provenance, not on the live consumeRemotely flag.
*
* A non-null leaderProducedRecordContext means this record was produced to local VT by the
* leader after consuming it from upstream. Its local-VT position is producedPosition (the
* local-broker offset returned by the producer callback), not consumerRecord.getPosition()
* (the offset on the upstream/source topic).
*
* Branching on shouldProduceToVersionTopic (which reads pcs.consumeRemotely() live) is racy:
* if the flag flips false between the leader's produce and the drainer's processing of the
* same record, the local branch below would capture consumerRecord.getPosition() (the
* upstream offset on a different broker, with a different topicId) into
* latestProcessedVtPosition.
*
* updateOffsetsAsRemoteConsumeLeader already handles the chunk-record sub-case
* (consumedPosition=EARLIEST) as a no-op via its hasCorrespondingUpstreamMessage check.
*/
/*
* leaderProducedRecordContext != null means this record was produced to local VT by the leader after consuming it
* from upstream. Its local-VT position is producedPosition (returned by the producer callback), not
* consumerRecord.getPosition() (the offset on the upstream/source topic).
*
* Branching on shouldProduceToVersionTopic (which reads pcs.consumeRemotely() live) is racy: if the flag flips
* false between the leader's produce and the drainer's processing of the same record, the local branch below would
* capture consumerRecord.getPosition() (the upstream offset on a different broker, with a different topicId) into
* latestProcessedVtPosition.
*/

super nit: I feel like this comment can be shortened

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants