[server][venice-common] Leader handover: emit graceful-EOS marker on Leader -> Standby#2793
[server][venice-common] Leader handover: emit graceful-EOS marker on Leader -> Standby#2793sushantmane wants to merge 9 commits into
Conversation
…Leader
-> Standby
Adds the emit side of an interim, cooperative fast-path for the
STANDBY -> LEADER transition wait. Each leader, when it receives a
Helix LEADER -> STANDBY (or LEADER -> OFFLINE) transition, drains its
producer and emits a final EndOfSegment control message marked with a
new gracefulLeadershipHandoff=true flag. The message carries the
demoting leader's termId in LeaderMetadata.termId (already in the
footer since v14).
A follow-up change adds the consume side: a new leader observes the
marker and (behind a separate consume-side flag) skips the legacy
5-minute inactivity wait in canSwitchToLeaderTopicLegacy.
This PR is behavior-preserving in isolation: no reader of the marker
ships in this change, so the on-the-wire field is dormant until the
consume side lands. The new VT messages are correctly handled by old
binaries because the field defaults to false on read.
Schema
- Bumps KafkaMessageEnvelope to v15. EndOfSegment gains
gracefulLeadershipHandoff: boolean, default false.
- Bumps AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE current
protocol version from 14 to 15.
VeniceWriter
- New sendGracefulLeadershipEndOfSegment(partition, term,
localPubSubClusterId, callback) that closes the partition's
current segment with finalSegment=true,
gracefulLeadershipHandoff=true, and LeaderMetadata.termId =
given term. No-op when no segment is open.
Server integration
- PartitionConsumptionState gains currentLeaderTermId,
set on STANDBY -> LEADER (from the Helix session checker term),
cleared on LEADER -> STANDBY after the marker is emitted.
- LeaderFollowerStoreIngestionTask.processConsumerAction for
LEADER_TO_STANDBY now calls emitGracefulLeadershipEosIfEnabled
just before vw.closePartition(). The call is behind
server.leader.handover.emit.graceful.eos (default true) and
bounded by server.leader.handover.emit.graceful.eos.ack.timeout.ms
(default 5000). Failure or timeout is best-effort and logged.
Config keys (ConfigKeys + VeniceServerConfig getters)
- server.leader.handover.emit.graceful.eos (default true)
- server.leader.handover.consume.graceful.eos (default false,
consumed by the follow-up PR)
- server.leader.handover.emit.graceful.eos.ack.timeout.ms (default 5000)
Metrics (HostLevelIngestionStats)
- leader_stepdown_graceful_eos_emit_success
- leader_stepdown_graceful_eos_emit_failure
Testing Done
- VeniceWriterUnitTest:
testSendGracefulLeadershipEndOfSegmentEmitsExpectedFields
testSendGracefulLeadershipEndOfSegmentIsNoOpWhenSegmentAlreadyClosed
- PartitionConsumptionStateTest:
testCurrentLeaderTermIdLifecycle
- Re-ran the full VeniceWriterUnitTest, AvroProtocolDefinitionTest,
TestOptimizedKafkaValueSerializer, LeaderFollowerStoreIngestionTaskTest,
DolStampTest, and PartitionConsumptionStateTest - all pass.
There was a problem hiding this comment.
Pull request overview
This PR adds the emit-side of a cooperative leader handover optimization: when a leader replica steps down, it can emit a final EndOfSegment control message marked with a new gracefulLeadershipHandoff flag (and stamped with the demoting leader’s termId) so a future consume-side change can safely shorten the STANDBY -> LEADER wait path.
Changes:
- Bump
KafkaMessageEnvelopeprotocol to v15 and addEndOfSegment.gracefulLeadershipHandoff(defaultfalse). - Add
VeniceWriter.sendGracefulLeadershipEndOfSegment(...)to emit a “graceful” EOS withfinalSegment=trueandLeaderMetadata.termId. - Wire server ingestion to record/clear a per-partition
currentLeaderTermId, emit the graceful EOS onLEADER -> STANDBY, and expose new config + metrics.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit tests for the new graceful EOS writer API. |
| internal/venice-common/src/main/resources/avro/KafkaMessageEnvelope/v15/KafkaMessageEnvelope.avsc | Introduces KME v15 schema with gracefulLeadershipHandoff on EndOfSegment. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Adds sendGracefulLeadershipEndOfSegment(...) to emit a graceful handoff EOS and end the segment. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bumps KAFKA_MESSAGE_ENVELOPE current protocol version to 15. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds server config keys for emit/consume graceful EOS and ack timeout. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Tests lifecycle of the new currentLeaderTermId field. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java | Adds success/failure metrics for graceful EOS emission. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Adds currentLeaderTermId tracking and accessors. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Emits graceful EOS on stepdown (best-effort, bounded wait), and tracks term on promotion. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Plumbs new config flags/timeouts into server config getters. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Step-Down Stamp
Addresses review feedback: the prior implementation used a flag on
EndOfSegment
and depended on the leader's segment state, which had two issues:
1. No marker was emitted when the leader had no open segment (e.g. leader
never produced for this term, or previously rolled the segment).
2. Closing the segment ourselves was fragile against any in-flight straggler
write that could reopen a segment after our EOS.
Switches to a DoL-style self-contained stamp:
- New KafkaKey.LEADER_STEPDOWN_STAMP sentinel
- New LeaderStepDownStampGuidGenerator (type-3 GUID, distinct from
HeartbeatGuidV3Generator and DoLStampGuidGenerator)
- New VeniceWriter.sendLeaderStepDownStamp(...) modeled exactly on
sendDoLStamp(...): dedicated producer GUID, segmentNumber=0,
sequenceNumber=0, LeaderMetadata.termId stamped with the demoting
leader's term, payload reuses heartbeat ControlMessage
- Schema v15 (added in the prior commit) is no longer needed - reverted to
v14, v15 directory deleted, EndOfSegment.gracefulLeadershipHandoff field
dropped
Config keys renamed for clarity:
- server.leader.handover.emit.graceful.eos
-> server.leader.handover.emit.stepdown.stamp
- server.leader.handover.consume.graceful.eos
-> server.leader.handover.consume.stepdown.stamp
- server.leader.handover.emit.graceful.eos.ack.timeout.ms
-> server.leader.handover.emit.stepdown.stamp.ack.timeout.ms
LeaderFollowerStoreIngestionTask emit path now calls sendLeaderStepDownStamp,
which works regardless of the leader's segment state. Ordered after
closePartition so the stamp is the tail record for the new leader's check.
Metrics renamed:
- leader_stepdown_graceful_eos_emit_{success,failure}
-> leader_stepdown_stamp_emit_{success,failure}
Testing Done
- VeniceWriterUnitTest:
testSendLeaderStepDownStampEmitsExpectedFields (new) - verifies the
stamp lands with KafkaKey.LEADER_STEPDOWN_STAMP, ProducerMetadata uses
the dedicated step-down GUID with segment=0 and seq=0, and
LeaderMetadata.termId/upstreamKafkaClusterId carry the passed-in values.
testSendLeaderStepDownStampDoesNotRequireOpenSegment (new) - verifies
the stamp is emitted even when no put/segment was opened first.
- testCurrentLeaderTermIdLifecycle still passes.
- Local compile clean across venice-common and da-vinci-client.
…nterrupt on InterruptedException Addresses Copilot review feedback on linkedin#2793 about the ack-wait catch swallowing InterruptedException. Splits emitLeaderStepDownStampIfEnabled into: 1. produce try/catch (any failure to send the stamp -> warn + failure metric) 2. ack-wait try/catch with explicit InterruptedException, TimeoutException, and ExecutionException arms. On InterruptedException we now restore the interrupt status via Thread.currentThread().interrupt() so the Helix state-transition / shutdown caller can observe the interrupt promptly. The stamp may or may not have landed; the new leader simply falls back to the legacy 5-minute wait if it does not see it - safety unchanged. Testing Done - Existing :clients:da-vinci-client compilation clean. - VeniceWriterUnitTest stamp tests still pass.
… PR linkedin#2794 # Conflicts: # clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java
Extends testVeniceWriterInProcessConsumerAction with three new cases that
exercise the Leader Step-Down stamp emit helper, raising diff coverage on
clients/da-vinci-client's new lines:
case 4 - success path: emit flag on, leader term recorded, mock writer
returns a completed-future PubSubProduceResult. Asserts that
sendLeaderStepDownStamp(versionTopic-partition, null, term, clusterId)
fires with the recorded term, and currentLeaderTermId is cleared on
the tail of the L->Standby handler.
case 5 - ack timeout path: mock writer returns a future that never
completes. The helper times out at the configured ackTimeoutMs, logs,
increments the failure metric, and the consumer-action processing
continues without crash.
case 6 - flag-off path: emit flag flipped to false; sendLeaderStepDownStamp
is not invoked. Confirms the early-return guard.
Testing Done
- testVeniceWriterInProcessConsumerAction passes locally (2.3s).
There was a problem hiding this comment.
Copilot encountered an error: Your billing is not configured or you have Copilot licenses from multiple standalone organizations or enterprises. To use premium requests, select a billing entity via the GitHub site, under Settings > Copilot > Features.
sixpluszero
left a comment
There was a problem hiding this comment.
Posting a batch of comments from a local review pass. The producer-side shape is well aligned with the existing DoL-stamp family; flagging 5 issues that look worth addressing before merge — most are one-liners. Detail inline below.
(Review generated with assistance from Claude Code.)
cleanup-on-early-return
Addresses 5 review comments on the Leader Step-Down Stamp emit side:
1. getLeaderStepDownStampKME now sets LeaderMetadata.upstreamMessageTimestamp
to DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP (-1). Avro-generated primitives
initialize long to 0 rather than the schema default, so without this
assignment a downstream consumer would see 0 and misinterpret the stamp
as having a real upstream timestamp. Mirrors the defensive setting that
already exists in getDoLStampKME.
2. StoreIngestionTask.validateMessage now includes LEADER_STEPDOWN_STAMP in
the DIV bypass list alongside HEART_BEAT and DOL_STAMP. The stamp uses a
dedicated type-3 producer GUID with segmentNumber=0, sequenceNumber=0
and does not participate in the per-segment DIV chain.
3. LEADER_TO_STANDBY now wraps the body in try/finally so
clearCurrentLeaderTermId() runs on the already-STANDBY early-return path,
not just the normal demotion flow. The PCS-null and session-invalid
early returns intentionally skip the clear (PCS is unavailable or a
newer transition is already in flight).
4. Updated the stale "graceful-leadership-handoff EndOfSegment" comment in
the STANDBY -> LEADER setCurrentLeaderTermId path to describe the new
step-down stamp design.
5. Updated PartitionConsumptionState.currentLeaderTermId Javadoc to
describe stamp-based usage (no longer "graceful-EOS emit path"). Also
documents the wall-clock termId source and its acceptable-for-fast-path
caveat (full termId-based fencing tracked separately).
Testing Done
- :internal:venice-common:test VeniceWriterUnitTest passes.
- :clients:da-vinci-client:test PartitionConsumptionStateTest,
LeaderFollowerStoreIngestionTaskTest,
LeaderFollowerStoreIngestionTaskPauseTransitionTest all pass.
- :internal:venice-common:spotbugsTest clean.
# Conflicts: # clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
# Conflicts: # clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java # internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java
…#2794 # Conflicts: # clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java # clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
rolling-upgrade hazard
Two related changes addressing rolling-upgrade safety + Helix-transition
latency:
1. Default SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP to false. A leader on
this binary must not emit LEADER_STEPDOWN_STAMP records onto VT until
every reader in the fleet recognizes the new KafkaKey and skips it in
StoreIngestionTask.validateMessage. Old readers without the DIV-bypass
list update would validate the stamp's fixed (segment=0, sequence=0)
GUID as a regular segment control record and fail DIV. Operators flip
this flag fleet-wide only after every binary in the read path has
shipped the change.
2. Default SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS from
5000 ms to 1000 ms. The ack wait is synchronous inside the Helix
state-transition handler, so this value bounds the worst-case additional
Helix transition latency this feature can add. A 1-second bound keeps
the worst case manageable while still landing the ack in steady state
(typical local-VT ack is well under 1 s). Missing the ack is safe - the
new leader falls back to the legacy 5-minute wait.
Testing Done
- Compile + test classes clean.
- Existing testVeniceWriterInProcessConsumerAction (which stubs the
timeout) and PartitionConsumptionStateTest unchanged.
# Conflicts: # internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
Pushes diffCoverage on da-vinci-client from 41.67% to 50.0%, above the 45%
threshold the CI gate enforces.
Adds three cases to testVeniceWriterInProcessConsumerAction:
case 7 - ExecutionException during ack-wait: future completes
exceptionally; emit helper logs, increments failure metric, and demotion
proceeds (closePartition still fires).
case 8 - Synchronous produce-side exception: sendLeaderStepDownStamp
throws; emit helper logs via the outer try/catch and demotion proceeds.
case 9 - Already-STANDBY early-return path through the try/finally:
when PCS reports state == STANDBY at the top of LEADER_TO_STANDBY,
closePartition and stamp emit are skipped, but currentLeaderTermId is
still cleared via the finally block.
Testing Done
- testVeniceWriterInProcessConsumerAction passes (3.5 s).
- :clients:da-vinci-client:diffCoverage reports 50% branch coverage on
the diff, clears the 45% threshold.
…read time Addresses Copilot review comment: a misconfigured negative ack-timeout would make CompletableFuture.get(long, TimeUnit) return immediately or, on some JDKs, throw IllegalArgumentException - neither is what we want in the demotion handler. Clamp the value to Math.max(0, raw) at config-read time so the emit path can call get() unconditionally without a defensive guard at every call site. Zero still works (no-wait poll), which is the operator's choice. Testing Done - testVeniceWriterInProcessConsumerAction passes (3.1 s). - No behavior change for the default 1000 ms value.
Summary
Adds the emit side of an interim, cooperative fast-path for the
STANDBY -> LEADERtransition wait. When a leader receives a HelixLEADER -> STANDBY(orLEADER -> OFFLINE) transition, it emits a self-contained Leader Step-Down Stamp to the local Version Topic. The stamp is a DoL-style control message identified on the wire by a dedicatedKafkaKey.LEADER_STEPDOWN_STAMPwith its own type-3 producer GUID and fixed(segmentNumber=0, sequenceNumber=0). The demoting leader's term goes intoLeaderMetadata.termId.A follow-up PR (#2794) adds the consume side: a new leader observes the stamp at the tail of local VT and, behind a separate consume-side flag, skips the legacy 5-minute inactivity wait in
canSwitchToLeaderTopicLegacy.This PR is behavior-preserving in isolation - the consume side is in #2794, and the emit-side flag defaults off for rolling-upgrade safety.
Schema
No schema bump. The stamp reuses the existing
KafkaMessageEnvelopev14 schema and the heartbeatControlMessagepayload; the discriminator is the newKafkaKey.LEADER_STEPDOWN_STAMPsentinel (same pattern asKafkaKey.DOL_STAMP).VeniceWritersendLeaderStepDownStamp(partition, callback, leadershipTerm, localPubSubClusterId)mirroringsendDoLStamp. Always emittable, regardless of the leader's segment state.getLeaderStepDownStampKMEexplicitly setsleaderMetadataFooter.upstreamMessageTimestamp = DEFAULT_UPSTREAM_MESSAGE_TIMESTAMPto avoid the Avro-primitive vs schema-default mismatch (same defensive setting asgetDoLStampKME).KafkaKeyLEADER_STEPDOWN_STAMPsingleton, modeled exactly onDOL_STAMP. Dedicated type-3 GUID fromLeaderStepDownStampGuidGenerator.StoreIngestionTaskvalidateMessageaddsLEADER_STEPDOWN_STAMPto the DIV-bypass list alongsideHEART_BEATandDOL_STAMP. The three stamps share the invariant that they use dedicated type-3 producer GUIDs withsegmentNumber=0, sequenceNumber=0and do not participate in the per-segment DIV chain.Server integration (
LeaderFollowerStoreIngestionTask)PartitionConsumptionStategainscurrentLeaderTermId, set onSTANDBY -> LEADERfromchecker.getLeadershipTerm(). Cleared onLEADER -> STANDBYviatry { ... } finally { clearCurrentLeaderTermId() }so the already-STANDBY early-return path also clears it.LEADER_TO_STANDBYnow callsemitLeaderStepDownStampIfEnabled(...)afterclosePartition. The helper splits the produce-call try/catch from the ack-wait try/catch so the produce-sideExceptionis captured at its own scope, and the ack-wait explicitly handlesInterruptedException(restoring interrupt status),TimeoutException, andExecutionException.Config keys
server.leader.handover.emit.stepdown.stamp(defaultfalsefor rolling-upgrade safety)server.leader.handover.consume.stepdown.stamp(defaultfalse, consumed by [server][da-vinci] Leader handover: consume the graceful-EOS marker to skip the 5-minute wait #2794)server.leader.handover.emit.stepdown.stamp.ack.timeout.ms(default1000, bounds the worst-case Helix transition latency this feature can add)Metrics (
HostLevelIngestionStats)leader_stepdown_stamp_emit_successleader_stepdown_stamp_emit_failureTesting Done
VeniceWriterUnitTest:testSendLeaderStepDownStampEmitsExpectedFields— verifies the stamp lands withKafkaKey.LEADER_STEPDOWN_STAMP, the dedicated step-down GUID,segmentNumber=0, sequenceNumber=0, and the passed-intermId/upstreamKafkaClusterIdon the footer.testSendLeaderStepDownStampDoesNotRequireOpenSegment— verifies the stamp is emitted even with no priorput()(no open segment on the leader's main producer).LeaderFollowerStoreIngestionTaskTest.testVeniceWriterInProcessConsumerActionextended with success / ack-timeout / flag-off cases foremitLeaderStepDownStampIfEnabled.PartitionConsumptionStateTest.testCurrentLeaderTermIdLifecycle— default-1, set to a positive term, clear returns to-1.LeaderFollowerStoreIngestionTaskTest,LeaderFollowerStoreIngestionTaskPauseTransitionTest,DolStampTest,AvroProtocolDefinitionTest,TestOptimizedKafkaValueSerializer, fullVeniceWriterUnitTest, fullPartitionConsumptionStateTest— all pass.:internal:venice-common:spotbugsTestclean.