[server] Gate vtp protocol-schema header behind VtpHeaderEmissionMode#2798
Open
sushantmane wants to merge 2 commits into
Open
[server] Gate vtp protocol-schema header behind VtpHeaderEmissionMode#2798sushantmane wants to merge 2 commits into
sushantmane wants to merge 2 commits into
Conversation
…ssionMode
VeniceWriter attaches the vtp (KafkaMessageEnvelope schema) header whenever
the message is the first record of the first segment
(segmentNumber=0 && messageSequenceNumber=0). Heartbeat control messages are
constructed as START_OF_SEGMENT with the same coordinates, so every heartbeat
picks up the ~16 KB schema blob even though heartbeat consumers do not need it
for schema bootstrap.
On write paths with many partitions and frequent heartbeats this dominates the
consumer-side per-record memory footprint: a single ~155 B data record can
share a queue with thousands of ~16 KB heartbeat envelopes, and the in-flight
queue size becomes a function of vtp retention rather than payload size.
This change introduces a writer-scoped enum, VtpHeaderEmissionMode, surfaced
via the new config key venice.writer.vtp.header.emission.mode:
- SOS_AND_HB (default): emit on every segment-start including heartbeats.
Preserves pre-existing behavior; nothing changes on upgrade.
- SOS_ONLY: emit on regular data SOS records only; skip heartbeat SOS.
Consumers must obtain the KafkaMessageEnvelope schema from an earlier
data SOS or an out-of-band schema cache.
- NONE: never emit. Use only when all consumers can resolve the schema
without the per-segment hint.
Implementation notes:
- VeniceWriter#getHeaders gains a boolean isHeartbeat parameter so the call
site can distinguish heartbeat SOS from data SOS; the existing three
invocations are updated in place.
- Unrecognized config values fall back to SOS_AND_HB with a warning log,
matching the rest of the writer's tolerant config parsing.
- The vtp header is also gated on protocolSchemaHeader != null, unchanged
from pre-existing behavior.
Testing done:
- testHeartbeatVtpEmissionMode (DataProvider over all three modes):
confirms the vtp header is present on the heartbeat under SOS_AND_HB and
absent under SOS_ONLY / NONE.
- testDataSosWithVtpEmissionModeNone: confirms NONE drops the vtp header
on regular data segment-start records as well, not just heartbeats.
- Full VeniceWriterUnitTest class passes (regression check for the new
isHeartbeat parameter threading).
There was a problem hiding this comment.
Pull request overview
This PR adds a writer-scoped configuration to control whether VeniceWriter emits the vtp (transport protocol schema) header on segment-start messages, allowing operators to avoid attaching the ~16KB KafkaMessageEnvelope schema blob to heartbeat SOS records.
Changes:
- Introduces
VtpHeaderEmissionMode(SOS_AND_HBdefault,SOS_ONLY,NONE) and wires it intoVeniceWriterheader construction (including a newisHeartbeatparameter). - Adds config key
venice.writer.vtp.header.emission.modeto control the emission mode (tolerant parsing with warn-and-default behavior). - Adds unit tests verifying heartbeat behavior across modes and that
NONEsuppresses vtp on data SOS as well.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds tests validating vtp header emission on heartbeats and data SOS under different modes. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VtpHeaderEmissionMode.java | New enum defining emission modes and documenting intended semantics. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Parses the new config and gates vtp header attachment based on mode + heartbeat vs data SOS. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds the new config key and Javadoc describing how to use it. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review nits on linkedin#2798: the docs said "first message of every segment" but the actual gate is segmentNumber == 0 && messageSequenceNumber == 0, which on the data path only matches the very first segment-start per partition (segment 0, sequence 0). Heartbeats pin both coordinates to 0 in getHeartbeatKME, so every heartbeat matches the gate. Reworded all three locations (VtpHeaderEmissionMode Javadoc, the VeniceWriter constructor comment, and the ConfigKeys Javadoc on VENICE_WRITER_VTP_HEADER_EMISSION_MODE) to describe the gate precisely and call out the data-path vs heartbeat asymmetry. No behavior change.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
VeniceWriterattaches thevtpprotocol-schema header (the Avro JSON forKafkaMessageEnvelope, ~16 KB) on the first message of every segment(
segmentNumber == 0 && messageSequenceNumber == 0). Heartbeat control messagesare constructed as
START_OF_SEGMENTwith both numbers at zero, so under thepre-existing rule every heartbeat picks up the ~16 KB header even though
heartbeat consumers never use it for schema bootstrap.
On busy ingestion paths with many partitions and frequent heartbeats this
dominates the consumer-side per-record memory footprint: a small data record
can share an in-flight queue with thousands of ~16 KB heartbeat envelopes, and
the queue size becomes a function of
vtpretention rather than payload size.This PR introduces a writer-scoped enum,
VtpHeaderEmissionMode, surfaced viathe new config key
venice.writer.vtp.header.emission.mode:SOS_AND_HB(default)SOS_ONLYKafkaMessageEnvelopeschema by some other means (earlier data SOS on the same segment, or an out-of-band schema cache).NONEChanges
com.linkedin.venice.writer.VtpHeaderEmissionModedocumenting thethree modes.
VENICE_WRITER_VTP_HEADER_EMISSION_MODE(
venice.writer.vtp.header.emission.mode) inConfigKeys.VeniceWriterparses the property in its constructor; unrecognized valuesfall back to
SOS_AND_HBwith a warning log, matching the rest of thewriter's tolerant config parsing.
VeniceWriter#getHeadersgains aboolean isHeartbeatparameter so thecall site can distinguish heartbeat SOS from data SOS. The three existing
invocations (one in the regular send path, two in
sendHeartbeat) areupdated in place.
protocolSchemaHeader != nullis unchanged.Testing Done
testHeartbeatVtpEmissionMode(TestNGDataProviderover all three modes)verifies the
vtpheader is present on the heartbeat underSOS_AND_HBandabsent under
SOS_ONLY/NONE.testDataSosWithVtpEmissionModeNoneverifies thatNONEalso drops thevtpheader on regular data segment-start records — not just heartbeats —so consumers can rely on the documented semantics.
VeniceWriterUnitTestclass passes (regression check for the newisHeartbeatparameter threading).Backward Compatibility
Default is
SOS_AND_HB, which preserves the pre-existing rule(
isFirstMessageOfFirstSegment). Existing deployments see no behavior changeunless they explicitly opt into
SOS_ONLYorNONE.