Skip to content

Commit

Permalink
KAFKA-15213: provide the exact offset to QuorumController.replay (apa…
Browse files Browse the repository at this point in the history
…che#13643)

Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.

At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
  • Loading branch information
cmccabe committed Jul 28, 2023
1 parent e5861ee commit 10bcd4f
Show file tree
Hide file tree
Showing 18 changed files with 775 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
Expand Down Expand Up @@ -457,44 +458,39 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
}

private Throwable handleEventException(String name,
OptionalLong startProcessingTimeNs,
Throwable exception) {
if (!startProcessingTimeNs.isPresent() &&
ControllerExceptions.isTimeoutException(exception)) {
// If the event never started, and the exception is a timeout, increment the timed
// out metric.
controllerMetrics.incrementOperationsTimedOut();
private Throwable handleEventException(
String name,
OptionalLong startProcessingTimeNs,
Throwable exception
) {
OptionalLong deltaUs;
if (startProcessingTimeNs.isPresent()) {
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS));
} else {
deltaUs = OptionalLong.empty();
}
Throwable externalException =
ControllerExceptions.toExternalException(exception, () -> latestController());
if (!startProcessingTimeNs.isPresent()) {
log.error("{}: unable to start processing because of {}. Reason: {}", name,
exception.getClass().getSimpleName(), exception.getMessage());
return externalException;
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
fromInternal(exception, () -> latestController());
int epoch = curClaimEpoch;
if (epoch == -1) {
epoch = lastCommittedEpoch;
}
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
if (ControllerExceptions.isExpected(exception)) {
log.info("{}: failed with {} in {} us. Reason: {}", name,
exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
return externalException;
String failureMessage = info.failureMessage(epoch, deltaUs,
isActiveController(), lastCommittedOffset);
if (info.isTimeoutException() && (!deltaUs.isPresent())) {
controllerMetrics.incrementOperationsTimedOut();
}
if (isActiveController()) {
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s at epoch %d in %d us. Renouncing leadership and reverting " +
"to the last committed offset %d.",
name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
lastCommittedOffset), exception);
renounce();
if (info.isFault()) {
nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception);
} else {
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s in %d us. The controller is already in standby mode.",
name, exception.getClass().getSimpleName(), deltaUs),
exception);
log.info("{}: {}", name, failureMessage);
}
if (info.causesFailover() && isActiveController()) {
renounce();
}
return externalException;
return info.effectiveExternalException();
}

private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
Expand Down Expand Up @@ -755,22 +751,28 @@ public Long apply(List<ApiMessageAndVersion> records) {
// Start by trying to apply the record to our in-memory state. This should always
// succeed; if it does not, that's a fatal error. It is important to do this before
// scheduling the record for Raft replication.
int i = 1;
int recordIndex = 0;
for (ApiMessageAndVersion message : records) {
long recordOffset = prevEndOffset + 1 + recordIndex;
try {
replay(message.message(), Optional.empty(), prevEndOffset + records.size());
replay(message.message(), Optional.empty(), recordOffset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record, which was " +
"%d of %d record(s) in the batch following last write offset %d.",
message.message().getClass().getSimpleName(), i, records.size(),
prevEndOffset);
String failureMessage = String.format("Unable to apply %s " +
"record at offset %d on active controller, from the " +
"batch with baseOffset %d",
message.message().getClass().getSimpleName(),
recordOffset, prevEndOffset + 1);
throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
recordIndex++;
}
prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records);
snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
return prevEndOffset;
long nextEndOffset = prevEndOffset + recordIndex;
raftClient.scheduleAtomicAppend(controllerEpoch,
OptionalLong.of(prevEndOffset + 1),
records);
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
prevEndOffset = nextEndOffset;
return nextEndOffset;
}
});
op.processBatchEndOffset(offset);
Expand Down Expand Up @@ -988,18 +990,20 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
log.debug("Replaying commits from the active node up to " +
"offset {} and epoch {}.", offset, epoch);
}
int i = 1;
int recordIndex = 0;
for (ApiMessageAndVersion message : messages) {
long recordOffset = batch.baseOffset() + recordIndex;
try {
replay(message.message(), Optional.empty(), offset);
replay(message.message(), Optional.empty(), recordOffset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record on standby " +
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), i, messages.size(),
batch.baseOffset());
String failureMessage = String.format("Unable to apply %s " +
"record at offset %d on standby controller, from the " +
"batch with baseOffset %d",
message.message().getClass().getSimpleName(),
recordOffset, batch.baseOffset());
throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
recordIndex++;
}
}

Expand All @@ -1008,13 +1012,6 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
epoch,
batch.appendTimestamp()
);

if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestNonSnapshottedTimestamp = Math.min(
oldestNonSnapshottedTimestamp,
batch.appendTimestamp()
);
}
}
} finally {
reader.close();
Expand Down Expand Up @@ -1094,10 +1091,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
renounce();
}
} else if (newLeader.isLeader(nodeId)) {
log.info("Becoming the active controller at epoch {}, committed offset {}, " +
"committed epoch {}", newLeader.epoch(), lastCommittedOffset,
lastCommittedEpoch);
claim(newLeader.epoch());
long newLastWriteOffset = raftClient.logEndOffset() - 1;
log.info("Becoming the active controller at epoch {}, last write offset {}.",
newLeader.epoch(), newLastWriteOffset);
claim(newLeader.epoch(), newLastWriteOffset);
} else {
log.info("In the new epoch {}, the leader is {}.",
newLeader.epoch(), newLeaderName);
Expand Down Expand Up @@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
}
}

private void claim(int epoch) {
private void claim(int epoch, long newLastWriteOffset) {
try {
if (curClaimEpoch != -1) {
throw new RuntimeException("Cannot claim leadership because we are already the " +
"active controller.");
}
curClaimEpoch = epoch;
controllerMetrics.setActive(true);
updateWriteOffset(lastCommittedOffset);
updateWriteOffset(newLastWriteOffset);
clusterControl.activate();

// Before switching to active, create an in-memory snapshot at the last committed
Expand Down Expand Up @@ -1518,25 +1515,24 @@ private void handleFeatureControlChange() {
*
* @param message The metadata record
* @param snapshotId The snapshotId if this record is from a snapshot
* @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset
* if this record is from a snapshot, this is used along with RegisterBrokerRecord
* @param offset The offset of the record
*/
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
if (log.isTraceEnabled()) {
if (snapshotId.isPresent()) {
log.trace("Replaying snapshot {} record {}",
Snapshots.filenameFromSnapshotId(snapshotId.get()),
recordRedactor.toLoggableString(message));
} else {
log.trace("Replaying log record {} with batchLastOffset {}",
recordRedactor.toLoggableString(message), batchLastOffset);
log.trace("Replaying log record {} with offset {}",
recordRedactor.toLoggableString(message), offset);
}
}
logReplayTracker.replay(message);
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
clusterControl.replay((RegisterBrokerRecord) message, offset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
Expand Down Expand Up @@ -1769,11 +1765,6 @@ private void resetToEmptyState() {
*/
private long writeOffset;

/**
* Timestamp for the oldest record that was committed but not included in a snapshot.
*/
private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE;

/**
* How long to delay partition leader balancing operations.
*/
Expand Down Expand Up @@ -2330,4 +2321,11 @@ Time time() {
QuorumControllerMetrics controllerMetrics() {
return controllerMetrics;
}

// VisibleForTesting
void setWriteOffset(long newWriteOffset) {
appendControlEvent("setWriteOffset", () -> {
this.writeOffset = newWriteOffset;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.kafka.controller.errors;

import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.mutable.BoundedListTooLongException;

import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;


public class ControllerExceptions {
Expand Down Expand Up @@ -93,58 +86,4 @@ public static NotControllerException newWrongControllerException(OptionalInt con
return new NotControllerException("No controller appears to be active.");
}
}

/**
* Determine if an exception is expected. Unexpected exceptions trigger controller failovers
* when they are raised.
*
* @param exception The exception.
* @return True if the exception is expected.
*/
public static boolean isExpected(Throwable exception) {
if (exception instanceof ApiException) {
// ApiExceptions indicate errors that should be returned to the user.
return true;
} else if (exception instanceof NotLeaderException) {
// NotLeaderException is thrown if we try to append records, but are not the leader.
return true;
} else if (exception instanceof RejectedExecutionException) {
// This can happen when the controller is shutting down.
return true;
} else if (exception instanceof BoundedListTooLongException) {
// This can happen if we tried to create too many records.
return true;
} else if (exception instanceof InterruptedException) {
// Interrupted exceptions are not expected. They might happen during junit tests if
// the test gets stuck and must be terminated by sending IE to all the threads.
return false;
}
// Other exceptions are unexpected.
return false;
}

/**
* Translate an internal controller exception to its external equivalent.
*
* @param exception The internal exception.
* @return Its external equivalent.
*/
public static Throwable toExternalException(
Throwable exception,
Supplier<OptionalInt> latestControllerSupplier
) {
if (exception instanceof ApiException) {
return exception;
} else if (exception instanceof NotLeaderException) {
return newWrongControllerException(latestControllerSupplier.get());
} else if (exception instanceof RejectedExecutionException) {
return new TimeoutException("The controller is shutting down.", exception);
} else if (exception instanceof BoundedListTooLongException) {
return new PolicyViolationException("Unable to perform excessively large batch " +
"operation.");
} else if (exception instanceof InterruptedException) {
return new UnknownServerException("The controller was interrupted.");
}
return new UnknownServerException(exception);
}
}
Loading

0 comments on commit 10bcd4f

Please sign in to comment.