Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[davinci] PartitionTracker code cleanup #996

Merged
merged 2 commits into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
* This class maintains state about all the upstream producers for a given partition.
* It keeps track of the last segment, last sequence number and incrementally computed
* checksum for each producer (identified by a producer GUID).
*
* <p>
* This class is thread safe. Locking is at the granularity of producers. Multiple
* threads can process records from the same partition concurrently.
*
* <p>
* This class also encapsulates the capability to clear expired state, in the functions
* which take in the maxAgeInMs parameter:
*
* <p>
* - {@link #clearExpiredStateAndUpdateOffsetRecord(OffsetRecord, long)}
* - {@link #setPartitionState(OffsetRecord, long)}
*/
Expand Down Expand Up @@ -156,7 +156,7 @@ private void updateOffsetRecord(GUID guid, Segment segment, OffsetRecord offsetR
* The aggregates and debugInfo being stored in the {@link ProducerPartitionState} will add a bit
* of overhead when we checkpoint this metadata to disk, so we should be careful not to add a very
* large number of elements to these arbitrary collections.
*
* <p>
* In the case of the debugInfo, it is expected (at the time of writing this comment) that all
* partitions produced by the same producer GUID would have the same debug values (though nothing
* precludes us from having per-partition debug values in the future if there is a use case for
Expand Down Expand Up @@ -190,16 +190,15 @@ public void updateOffsetRecord(OffsetRecord offsetRecord) {
}

/**
* Ensures the integrity of the data by maintaining state about all of the data produced by a specific
* Ensures the integrity of the data by maintaining state about all the data produced by a specific
* upstream producer:
*
* <p>
* 1. Segment, which should be equal or greater to the previous segment.
* 2. Sequence number, which should be exactly one greater than the previous sequence number.
* 3. Checksum, which is computed incrementally until the end of a segment.
*
*
* @param consumerRecord
* @throws DataValidationException
* @param consumerRecord the incoming Kafka message.
* @throws DataValidationException if the DIV check failed.
*/
public void validateMessage(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
Expand All @@ -218,14 +217,14 @@ public void validateMessage(
/**
* This function ensures that the segment number is either equal or greater than the previous segment
* seen for this specific partition.
*
* This function has the side-effect of initializing a new {@link Segment} if:
* <p>
* This function has the side effect of initializing a new {@link Segment} if:
* 1. The previous segment does not exist, or
* 2. The incoming segment is exactly one greater than the previous one, and the previous segment is ended.
*
* @see #initializeNewSegment(PubSubMessage, boolean, boolean)
*
* @param consumerRecord
* @param consumerRecord the incoming Kafka message.
* @throws DuplicateDataException if the incoming segment is lower than the previously seen segment.
*/
private Segment trackSegment(
Expand All @@ -240,35 +239,36 @@ private Segment trackSegment(
"track new segment with non-zero incomingSegment=" + incomingSegmentNumber,
consumerRecord,
null,
endOfPushReceived);
endOfPushReceived,
true);
}
Segment newSegment = initializeNewSegment(consumerRecord, endOfPushReceived, true);
return newSegment;
} else {
int previousSegmentNumber = previousSegment.getSegmentNumber();
if (incomingSegmentNumber == previousSegmentNumber) {
return previousSegment;
} else if (incomingSegmentNumber == previousSegmentNumber + 1 && previousSegment.isEnded()) {
/** tolerateAnyMessageType should always be false in this scenario, regardless of {@param endOfPushReceived} */
return initializeNewSegment(consumerRecord, endOfPushReceived, false);
} else if (incomingSegmentNumber > previousSegmentNumber) {
if (tolerateMissingMsgs.get()) {
return initializeNewSegment(consumerRecord, endOfPushReceived, true);
} else {
throw DataFaultType.MISSING.getNewException(previousSegment, consumerRecord);
}
} else if (incomingSegmentNumber < previousSegmentNumber) {
throw DataFaultType.DUPLICATE.getNewException(previousSegment, consumerRecord);
} else {
// Defensive code.
throw new IllegalStateException(
"This condition should never happen. " + getClass().getSimpleName() + " may have a regression.");
return initializeNewSegment(consumerRecord, endOfPushReceived, true);
}
int previousSegmentNumber = previousSegment.getSegmentNumber();
if (incomingSegmentNumber == previousSegmentNumber) {
return previousSegment;
}
if (incomingSegmentNumber == previousSegmentNumber + 1 && previousSegment.isEnded()) {
/** tolerateAnyMessageType should always be false in this scenario, regardless of {@param endOfPushReceived} */
return initializeNewSegment(consumerRecord, endOfPushReceived, false);
}
if (incomingSegmentNumber > previousSegmentNumber) {
if (tolerateMissingMsgs.get()) {
return initializeNewSegment(consumerRecord, endOfPushReceived, true);
}
throw DataFaultType.MISSING.getNewException(previousSegment, consumerRecord);
}
// incomingSegmentNumber < previousSegmentNumber
throw DataFaultType.DUPLICATE.getNewException(previousSegment, consumerRecord);
}

/**
* @param consumerRecord
* Initialize a new segment for the incoming message. It is expected that the incoming message is a control message
* of type {@link ControlMessageType#START_OF_SEGMENT}. If the incoming message is not a control message, or if it is
* a control message of a different type, an exception will be thrown if tolerateAnyMessageType is false or end of push
* has not been received.
*
* @param consumerRecord the incoming Kafka message.
* @param tolerateAnyMessageType if true, we will tolerate initializing the Segment on any message
* if false, we will only tolerate initializing on a {@link ControlMessageType#START_OF_SEGMENT}
* @return the newly initialized {@link Segment}
Expand Down Expand Up @@ -323,14 +323,6 @@ private Segment initializeNewSegment(
return newSegment;
}

private void handleUnregisteredProducer(
String scenario,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
Segment segment,
boolean endOfPushReceived) {
handleUnregisteredProducer(scenario, consumerRecord, segment, endOfPushReceived, true);
}

/**
* Found an unregistered producer when creating a segment.
* @param endOfPushReceived Whether end of push is received for this partition.
Expand Down Expand Up @@ -360,11 +352,11 @@ private void handleUnregisteredProducer(
/**
* This function ensures that the sequence number is strictly one greater than the previous incoming
* message for this specific partition.
*
* This function has the side-effect of altering the sequence number stored in the {@link Segment}.
* <p>
* This function has the side effect of altering the sequence number stored in the {@link Segment}.
*
* @param segment for which the incoming message belongs to
* @param consumerRecord
* @param consumerRecord the incoming Kafka message.
* @param endOfPushReceived whether endOfPush is received
* @param tolerateMissingMsgs whether log compaction could potentially happen to this record
* @param hasPreviousSegment whether previous segment exists
Expand All @@ -379,19 +371,20 @@ private void trackSequenceNumber(
boolean hasPreviousSegment) throws MissingDataException, DuplicateDataException {

int previousSequenceNumber = segment.getSequenceNumber();
int incomingSequenceNumber = consumerRecord.getValue().producerMetadata.messageSequenceNumber;
ProducerMetadata recordMetadata = consumerRecord.getValue().getProducerMetadata();
int incomingSequenceNumber = recordMetadata.getMessageSequenceNumber();

if (!segment.isStarted()) {
segment.start();
segment.setLastRecordProducerTimestamp(consumerRecord.getValue().producerMetadata.messageTimestamp);
segment.setLastRecordProducerTimestamp(recordMetadata.getMessageTimestamp());
return;
}

if (incomingSequenceNumber == previousSequenceNumber) {
if (!segment.isNewSegment()) {
throw DataFaultType.DUPLICATE.getNewException(segment, consumerRecord);
}
segment.setLastRecordProducerTimestamp(consumerRecord.getValue().producerMetadata.messageTimestamp);
segment.setLastRecordProducerTimestamp(recordMetadata.getMessageTimestamp());
/**
* In any other cases for newly constructed segment, don't check sequence number anymore,
* because "previousSequenceNumber" is going to be equal to "incomingSequenceNumber", and thus this message
Expand All @@ -403,7 +396,7 @@ private void trackSequenceNumber(
if (incomingSequenceNumber == previousSequenceNumber + 1) {
// Expected case, in steady state
segment.getAndIncrementSequenceNumber();
segment.setLastRecordProducerTimestamp(consumerRecord.getValue().producerMetadata.messageTimestamp);
segment.setLastRecordProducerTimestamp(recordMetadata.getMessageTimestamp());
return;
}

Expand All @@ -413,7 +406,7 @@ private void trackSequenceNumber(
// never
// see the record coming from samza producer before it is promoted to leader. This check prevents the first
// message to be considered as "duplicated" and skipped.
segment.setLastRecordProducerTimestamp(consumerRecord.getValue().producerMetadata.messageTimestamp);
segment.setLastRecordProducerTimestamp(recordMetadata.getMessageTimestamp());
return;
}
// This is a duplicate message, which we can safely ignore.
Expand All @@ -436,12 +429,12 @@ private void trackSequenceNumber(
*/
if ((endOfPushReceived && !segment.isRegistered()) || tolerateMissingMsgs.get()) {
/**
* In this branch of the if, we need to adjust the sequence number, otherwise,
* In this branch, we need to adjust the sequence number, otherwise,
* this will cause spurious missing data metrics on further events...
* and the partition won't become 'ONLINE' if it is not 'ONLINE' yet.
*/
segment.setSequenceNumber(incomingSequenceNumber);
segment.setLastRecordProducerTimestamp(consumerRecord.getValue().producerMetadata.messageTimestamp);
segment.setLastRecordProducerTimestamp(recordMetadata.getMessageTimestamp());
return;
}

Expand All @@ -454,8 +447,8 @@ private void trackSequenceNumber(

/**
* This function maintains a running checksum of the data seen so far for this specific partition.
*
* This function has the side-effect of marking the {@link Segment} as ended when it encounters a
* <p>
* This function has the side effect of marking the {@link Segment} as ended when it encounters a
* {@link ControlMessageType#END_OF_SEGMENT} message and the incrementally computed checksum matches
* the expected one.
*
Expand Down Expand Up @@ -505,15 +498,15 @@ private void trackCheckSum(
*/
if ((endOfPushReceived && !segment.isRegistered()) || tolerateMissingMsgs.get()) {
segment.end(incomingEndOfSegment.finalSegment);
} else if (endOfPushReceived) {
/**
* If EOP is received, we will still end the segment and then throw exceptions.
* Ending the segment so that next SOS message wouldn't get misleading
* missing exceptions in {@link #trackSegment(ConsumerRecord, boolean)}
*/
segment.end(incomingEndOfSegment.finalSegment);
throw dataCorruptException;
} else {
if (endOfPushReceived) {
/**
* If EOP is received, we will still end the segment and then throw exceptions.
* Ending the segment so that next SOS message wouldn't get misleading
* missing exceptions in {@link #trackSegment(ConsumerRecord, boolean)}
*/
segment.end(incomingEndOfSegment.finalSegment);
}
throw dataCorruptException;
}
}
Expand All @@ -526,10 +519,10 @@ private void trackCheckSum(
* delay threshold, it indicates that log compaction might already take place, so missing message is expected
* and will be tolerated;
* ii. if the data are fresh and missing message is detected, error will be thrown.
*
* <p>
* If "logCompactionDelayInMs" is not a positive number, it indicates there is no delay for Kafka log compaction,
* Kafka would compact message at any time for hybrid stores, so missing messages is expected; no error will be thrown.
*
* <p>
* If "errorMetricCallback" is present, the callback will be triggered before throwing MISSING_MESSAGE exception;
* users can register their own callback to emit metrics, produce Kafka events, etc.
*/
Expand All @@ -545,16 +538,22 @@ private void validateSequenceNumber(
segment.start();
segment.setSequenceNumber(incomingSequenceNumber);
segment.setLastRecordTimestamp(consumerRecord.getPubSubMessageTime());
} else if (incomingSequenceNumber == previousSequenceNumber + 1) {
return;
}
if (incomingSequenceNumber == previousSequenceNumber + 1) {
// Expected case, in steady state
segment.getAndIncrementSequenceNumber();
segment.setLastRecordTimestamp(consumerRecord.getPubSubMessageTime());
} else if (incomingSequenceNumber <= previousSequenceNumber) {
return;
}
if (incomingSequenceNumber <= previousSequenceNumber) {
/**
* Duplicate message is acceptable, there is no data loss.
*/
segment.setLastRecordTimestamp(consumerRecord.getPubSubMessageTime());
} else if (incomingSequenceNumber > previousSequenceNumber + 1) {
return;
}
if (incomingSequenceNumber > previousSequenceNumber + 1) {
/**
* A gap is detected in sequence number. If the data are fresh, data are within the Kafka log compaction
* delay threshold, it indicates a clear data loss signal; if the broker timestamp of the data are older
Expand All @@ -574,10 +573,10 @@ private void validateSequenceNumber(
}
segment.setSequenceNumber(incomingSequenceNumber);
segment.setLastRecordTimestamp(consumerRecord.getPubSubMessageTime());
} else {
// Defensive coding, to prevent regressions in the above code from causing silent failures
throw new IllegalStateException("Unreachable code!");
return;
}
// Defensive coding, to prevent regressions in the above code from causing silent failures
throw new IllegalStateException("Unreachable code!");
}

public void checkMissingMessage(
Expand Down Expand Up @@ -655,35 +654,35 @@ enum DataFaultType {
* A given producer sent a message with a sequence number smaller or equal to the previously received
* sequence number, rather than being exactly one greater than the previous.
*/
DUPLICATE(msg -> new DuplicateDataException(msg)),
DUPLICATE(DuplicateDataException::new),

/**
* A given producer sent a message with a sequence number more than one greater than the previously
* received sequence number, rather than being exactly one greater than the previous.
*
* <p>
* N.B.: Out-of-order data can manifest as missing data, since the Venice Transport Protocol only
* keeps track of a high-water mark (the sequence number). Dealing gracefully with out-of-order
* data and reconstructing the proper order would require buffering arbitrarily large amounts
* of data would be more complex, hence why it is not supported at this time.
*/
MISSING(msg -> new MissingDataException(msg)),
MISSING(MissingDataException::new),

/**
* A given producer sent a {@link ControlMessageType#END_OF_SEGMENT} which included a checksum that
* did not match to the data received by the same producer.
*/
CORRUPT(msg -> new CorruptDataException(msg)),
CORRUPT(CorruptDataException::new),

/**
* Received a message from a given producer without first receiving a {@link
* ControlMessageType#START_OF_SEGMENT}.
*
* <p>
* N.B.: This used to show up as {@link DataFaultType#MISSING} data. This new fault type was
* introduced in order to disambiguate these two cases, because in some cases, the upstream
* code may want to be more lenient with this specific type of failure (such as when a {@link
* ControlMessageType#TOPIC_SWITCH} was received).
*/
UNREGISTERED_PRODUCER(msg -> new ImproperlyStartedSegmentException(msg));
UNREGISTERED_PRODUCER(ImproperlyStartedSegmentException::new);

final Function<String, DataValidationException> exceptionSupplier;

Expand Down
Loading