[DO NOT REVIEW] Handle non-numeric PubSubPosition in leader produce and checkpoint paths#2605
[DO NOT REVIEW] Handle non-numeric PubSubPosition in leader produce and checkpoint paths#2605haoxu07 wants to merge 5 commits intolinkedin:mainfrom
Conversation
… and checkpoint paths When RT topics migrate from Kafka to Northguard, PubSubPosition instances become NGRangePosition which throws UnsupportedOperationException on getNumericOffset(). This change adds graceful handling at all crash sites in the leader produce and offset checkpoint paths: - VeniceWriter: wrap 3 sites (data PUT/DELETE, heartbeat, non-default leader metadata) with getNumericOffsetOrDefault() helper that returns -1 for non-numeric positions - PubSubUtil.deserializePositionWithOffsetFallback: split symbolic position check from numeric comparison; catch UnsupportedOperationException and return the deserialized NG position as-is - OffsetRecord.checkpointRtPosition: catch UnsupportedOperationException and store -1 in legacy upstreamOffsetMap (wire-format bytes in upstreamRealTimeTopicPubSubPositionMap are authoritative) - InMemoryPubSubPosition: add ofNonNumeric() factory + getInternalOffset() to enable NG position simulation in tests - MockInMemoryConsumerAdapter.advancePosition: use getInternalOffset() to avoid crash with non-numeric test positions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…minology Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes crashes caused by non-Kafka PubSubPosition implementations (e.g., Northguard NGRangePosition) throwing UnsupportedOperationException on getNumericOffset() as RT topics migrate off Kafka, ensuring leader produce paths and follower checkpointing can safely handle non-numeric positions while preserving authoritative wire-format bytes.
Changes:
- Add safe numeric-offset extraction in
VeniceWriterto avoid crashes when upstream positions are non-numeric. - Update
PubSubUtil.deserializePositionWithOffsetFallbackandOffsetRecord.checkpointRtPositionto tolerate non-numeric positions and keep wire-format bytes authoritative. - Extend test infrastructure and unit tests to simulate/validate non-numeric position behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Use a helper to default numeric offsets to -1 when unsupported for leader metadata fields. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubUtil.java | Avoid numeric-offset comparisons for symbolic or non-numeric positions during deserialization fallback. |
| internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java | Catch UnsupportedOperationException when checkpointing RT positions and persist -1 in legacy offset map. |
| internal/venice-test-common/src/main/java/com/linkedin/venice/pubsub/mock/InMemoryPubSubPosition.java | Add ofNonNumeric() to simulate positions that don’t support numeric offsets; propagate behavior across next/prev helpers. |
| internal/venice-test-common/src/main/java/com/linkedin/venice/pubsub/mock/adapter/consumer/MockInMemoryConsumerAdapter.java | Adjust advancePosition to use getInternalOffset() for in-memory positions. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Add unit tests covering numeric-offset extraction helper for numeric and non-numeric positions. |
| internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubUtilTest.java | Add unit test asserting non-numeric deserialized positions are returned as-is. |
| internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java | Add unit test ensuring checkpointing non-numeric positions stores -1 in legacy offset map. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| long startOffset = (startInclusive instanceof InMemoryPubSubPosition) | ||
| ? ((InMemoryPubSubPosition) startInclusive).getInternalOffset() | ||
| : startInclusive.getNumericOffset(); | ||
| long targetOffset = Math.addExact(startOffset, n); | ||
| return InMemoryPubSubPosition.of(targetOffset); |
There was a problem hiding this comment.
advancePosition() loses the non-numeric simulation: if startInclusive is an InMemoryPubSubPosition created via ofNonNumeric(), this method computes offsets via getInternalOffset() but always returns InMemoryPubSubPosition.of(targetOffset), which re-enables numeric offsets. Consider preserving the numericOffsetSupported flag (e.g., return ofNonNumeric(targetOffset) when the input was non-numeric) so callers can advance positions without silently changing position semantics.
| long startOffset = (startInclusive instanceof InMemoryPubSubPosition) | |
| ? ((InMemoryPubSubPosition) startInclusive).getInternalOffset() | |
| : startInclusive.getNumericOffset(); | |
| long targetOffset = Math.addExact(startOffset, n); | |
| return InMemoryPubSubPosition.of(targetOffset); | |
| long startOffset; | |
| boolean numericOffsetSupported = true; | |
| if (startInclusive instanceof InMemoryPubSubPosition) { | |
| InMemoryPubSubPosition inMemoryPosition = (InMemoryPubSubPosition) startInclusive; | |
| startOffset = inMemoryPosition.getInternalOffset(); | |
| numericOffsetSupported = inMemoryPosition.isNumericOffsetSupported(); | |
| } else { | |
| startOffset = startInclusive.getNumericOffset(); | |
| } | |
| long targetOffset = Math.addExact(startOffset, n); | |
| return numericOffsetSupported | |
| ? InMemoryPubSubPosition.of(targetOffset) | |
| : InMemoryPubSubPosition.ofNonNumeric(targetOffset); |
| /** | ||
| * Creates a position that simulates a pub sub system whose positions do not support numeric offsets. | ||
| * The internal offset is still used for mock broker indexing, but {@link #getNumericOffset()} | ||
| * throws {@link UnsupportedOperationException}. | ||
| */ | ||
| public static InMemoryPubSubPosition ofNonNumeric(long offset) { | ||
| return new InMemoryPubSubPosition(offset, false); | ||
| } | ||
|
|
There was a problem hiding this comment.
ofNonNumeric() currently only affects runtime behavior (getNumericOffset throws) but does not survive wire-format round-tripping: getPositionWireFormat()/toWireFormatBuffer() don’t encode numericOffsetSupported, so deserializing the bytes will produce a regular (numeric) InMemoryPubSubPosition. If this factory is intended for end-to-end simulation of non-Kafka positions, consider encoding the flag (or using a distinct typeId/factory) so the non-numeric behavior is preserved across serialization/deserialization.
| import com.linkedin.venice.kafka.protocol.GUID; | ||
| import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
There was a problem hiding this comment.
Import ordering in this test file is inconsistent with other venice-common tests (static imports are typically grouped together at the top). Here, static Mockito imports appear after non-static imports, which may fail checkstyle/spotless. Please regroup imports so all static imports are together and separated from non-static imports.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| * Extract the numeric offset from a PubSubPosition, returning -1 if the position type | ||
| * does not support numeric offsets. | ||
| */ | ||
| static long getNumericOffsetOrDefault(PubSubPosition position) { | ||
| try { | ||
| return position.getNumericOffset(); | ||
| } catch (UnsupportedOperationException e) { | ||
| return -1; |
There was a problem hiding this comment.
getNumericOffsetOrDefault() hard-codes -1 as the fallback. Since this value is semantically tied to EARLIEST, consider returning PubSubSymbolicPosition.EARLIEST.getNumericOffset() instead of a magic number to keep the meaning centralized (and consistent if the sentinel ever changes).
| * Extract the numeric offset from a PubSubPosition, returning -1 if the position type | |
| * does not support numeric offsets. | |
| */ | |
| static long getNumericOffsetOrDefault(PubSubPosition position) { | |
| try { | |
| return position.getNumericOffset(); | |
| } catch (UnsupportedOperationException e) { | |
| return -1; | |
| * Extract the numeric offset from a PubSubPosition, returning the numeric offset of | |
| * {@link PubSubSymbolicPosition#EARLIEST} if the position type does not support | |
| * numeric offsets. | |
| */ | |
| static long getNumericOffsetOrDefault(PubSubPosition position) { | |
| try { | |
| return position.getNumericOffset(); | |
| } catch (UnsupportedOperationException e) { | |
| return PubSubSymbolicPosition.EARLIEST.getNumericOffset(); |
| // Store -1 as legacy fallback; the wire-format bytes above are the authoritative position. | ||
| numericOffset = -1; |
There was a problem hiding this comment.
checkpointRtPosition() uses a hard-coded -1 sentinel when getNumericOffset() is unsupported. Consider using PubSubSymbolicPosition.EARLIEST.getNumericOffset() (or a shared constant) instead of repeating the magic number, so the legacy fallback semantics remain explicit and consistent.
| // Store -1 as legacy fallback; the wire-format bytes above are the authoritative position. | |
| numericOffset = -1; | |
| // Store EARLIEST's numeric offset as legacy fallback; the wire-format bytes above are the authoritative position. | |
| numericOffset = PubSubSymbolicPosition.EARLIEST.getNumericOffset(); |
Simplify test assertion to verify checkpointRtPosition doesn't throw with a non-numeric position, rather than inspecting internal state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sushantmane
left a comment
There was a problem hiding this comment.
We need to handler this in internal code and not in OSS
Summary
When RT topics migrate from Kafka to Northguard,
PubSubPositioninstances becomeNGRangePositionwhich throwsUnsupportedOperationExceptionongetNumericOffset(). This causes crashes in the leader produce and offset checkpoint paths on followers consuming VT messages with NG position bytes.Crash sites fixed (5 production, 2 test infrastructure):
VeniceWriter.javagetKafkaMessageEnvelopeProvider(L1182)getNumericOffsetOrDefault()helperVeniceWriter.javagetHeartbeatKME(L2290)VeniceWriter.javagetKafkaValue(L2423)PubSubUtil.javadeserializePositionWithOffsetFallback(L387)UnsupportedOperationExceptionOffsetRecord.javacheckpointRtPosition(L316)UnsupportedOperationException, store -1 in legacy mapInMemoryPubSubPosition.javagetNumericOffsetofNonNumeric()factory for NG simulationMockInMemoryConsumerAdapter.javaadvancePositiongetInternalOffset()for InMemoryPubSubPositionKey design decisions:
upstreamOffsetlong field inLeaderMetadataandupstreamOffsetMapinPartitionStateare legacy fallbacks — the wire-format bytes inupstreamPubSubPosition/upstreamRealTimeTopicPubSubPositionMapare authoritative-1for non-numeric positions is consistent withPubSubSymbolicPosition.EARLIEST.getNumericOffset()OffsetRecord.checkpointRtPosition) is a latent crash unmasked by fix [server] Use interpolation for logging in venice-common #4 — fixingdeserializePositionWithOffsetFallbackallows NG positions to flow downstream tocheckpointRtPositionwhere they previously fell back toEARLIESTCrash chain (before this fix):
After PubSubUtil fix only (without OffsetRecord fix):
After both fixes:
Test plan
VeniceWriterUnitTest.testGetNumericOffsetOrDefaultWithKafkaPosition— verifies Kafka position passthroughVeniceWriterUnitTest.testGetNumericOffsetOrDefaultWithUnsupportedPosition— verifies -1 fallback for NG positionPubSubUtilTest.testDeserializePositionWithOffsetFallbackNonNumericPosition— verifies NG position returned as-isTestOffsetRecord.testCheckpointRtPositionWithNonNumericPosition— verifies legacy offset map gets -1InMemoryPubSubPosition.ofNonNumeric()enables future integration tests with NG position simulation🤖 Generated with Claude Code