Skip to content

Commit

Permalink
KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115
Browse files Browse the repository at this point in the history
)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
mumrah committed Jul 28, 2023
1 parent 811ae01 commit 32c39c8
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 20 deletions.
Expand Up @@ -163,7 +163,10 @@ class ZkMigrationIntegrationTest {
readyFuture.get(30, TimeUnit.SECONDS)

val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

def inDualWrite(): Boolean = {
val migrationState = kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10, TimeUnit.SECONDS)
Expand Down Expand Up @@ -286,7 +289,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -358,7 +364,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -422,7 +431,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -481,7 +493,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Create new topic with AdminClient")
Expand Down
Expand Up @@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState newState) {
}
}

private boolean checkDriverState(MigrationDriverState expectedState) {
if (migrationState.equals(expectedState)) {
return true;
} else {
log.info("Expected driver state {} but found {}. Not running this event {}.",
expectedState, migrationState, this.getClass().getSimpleName());
return false;
}
}

private void transitionTo(MigrationDriverState newState) {
if (!isValidStateChange(newState)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -460,11 +470,21 @@ public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";

if (migrationState.equals(MigrationDriverState.INACTIVE)) {
// No need to log anything if this node is not the active controller
completionHandler.accept(null);
return;
}

if (!migrationState.allowDualWrite()) {
log.trace("Received metadata {}, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper", metadataType);
completionHandler.accept(null);
wakeup();
// If the driver is active and dual-write is not yet enabled, then the migration has not yet begun.
// Only wake up the thread if the broker registrations have changed
if (delta.clusterDelta() != null) {
wakeup();
}
return;
}

Expand Down Expand Up @@ -525,7 +545,7 @@ class WaitForControllerQuorumEvent extends MigrationEvent {

@Override
public void run() throws Exception {
if (migrationState.equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
if (checkDriverState(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
if (!firstPublish) {
log.trace("Waiting until we have received metadata before proceeding with migration");
return;
Expand Down Expand Up @@ -571,24 +591,19 @@ public void run() throws Exception {
class WaitForZkBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_BROKERS:
if (areZkBrokersReadyForMigration()) {
log.info("Zk brokers are registered and ready for migration");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
break;
default:
// Ignore the event as we're not in the appropriate state anymore.
break;
if (checkDriverState(MigrationDriverState.WAIT_FOR_BROKERS)) {
if (areZkBrokersReadyForMigration()) {
log.info("Zk brokers are registered and ready for migration");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
}
}
}

class BecomeZkControllerEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER)) {
applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
Expand All @@ -606,6 +621,9 @@ public void run() throws Exception {
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (!checkDriverState(MigrationDriverState.ZK_MIGRATION)) {
return;
}
Set<Integer> brokersInMetadata = new HashSet<>();
log.info("Starting ZK migration");
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
Expand Down Expand Up @@ -662,7 +680,7 @@ public void run() throws Exception {
class SyncKRaftMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
log.info("Performing a full metadata sync from KRaft to ZK.");
Map<String, Integer> dualWriteCounts = new TreeMap<>();
long startTime = time.nanoseconds();
Expand All @@ -681,7 +699,7 @@ class SendRPCsToBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
Expand Down
Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -634,4 +635,49 @@ public void testControllerFailover() throws Exception {
assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
});
}

@Test
public void testBeginMigrationOnce() throws Exception {
AtomicInteger migrationBeginCalls = new AtomicInteger(0);
NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
@Override
public void beginMigration() {
migrationBeginCalls.incrementAndGet();
}
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);

driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));

// Call onMetadataUpdate twice. The first call will trigger the migration to begin (due to presence of brokers)
// Both calls will "wakeup" the driver and cause a PollEvent to be run. Calling these back-to-back effectively
// causes two MigrateMetadataEvents to be enqueued. Ensure only one is actually run.
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
assertEquals(1, migrationBeginCalls.get());
}
}
}

0 comments on commit 32c39c8

Please sign in to comment.