Skip to content

Commit

Permalink
AKCORE-186: Added Integration tests for Read and Write share group st…
Browse files Browse the repository at this point in the history
…ate RPCs (apache#1352)

* AKCORE-186: Added Integration tests for Read and Write share group state RPCs

* AKCORE-186: Used waitForCondition in testShareGroupStateTopicCreation
  • Loading branch information
chirag-wadhwa5 committed Jul 2, 2024
1 parent 8e8c431 commit 235d43e
Showing 1 changed file with 266 additions and 0 deletions.
266 changes: 266 additions & 0 deletions core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -73,6 +76,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.kafka.common.internals.Topic.SHARE_GROUP_STATE_TOPIC_NAME;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -108,6 +112,7 @@ public void createCluster() throws Exception {
.setConfigProp("transaction.state.log.min.isr", "1")
.setConfigProp("transaction.state.log.replication.factor", "1")
.setConfigProp("unstable.api.versions.enable", "true")
.setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.DefaultStatePersister")
.build();
cluster.format();
cluster.startup();
Expand Down Expand Up @@ -1459,6 +1464,243 @@ public void testLsoMovementByRecordsDeletion() {
producer.close();
}

@Test
public void testShareGroupStateTopicCreation() throws InterruptedException {
Set<String> topics = null;
try {
topics = listTopics().names().get();
} catch (Exception e) {
fail("Failed to list topics: " + e);
}
// The __share_group_state topic is created on the first FIND_COORDINATOR RPC call for any share group topic partition.
// Since that does not happen automatically on cluster creation, we expect that the cluster will not have
// __share_group_state topic until yet.
assertFalse(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));

shareConsumer.poll(Duration.ofMillis(2000));
TestUtils.waitForCondition(
() -> {
boolean ans = false;
try {
ans = listTopics().names().get().contains(SHARE_GROUP_STATE_TOPIC_NAME);
} catch (Exception e) {
fail("Failed to list topics: " + e);
}
return ans;
},
45000L,
3000L,
() -> "Failed to create share group topic");

// The above condition only checks for the existence of the __share_group_state topic, but that happens when
// FIND_COORDINATOR is called internally. We need to wait for the ReadShareGroupState to finish completely
Thread.sleep(2000L);

shareConsumer.close();
}

@Test
public void testWriteShareGroupState() throws Exception {
Set<String> topics = listTopics().names().get();
// The __share_group_state topic is created on the first FIND_COORDINATOR RPC call for any share group topic partition.
// Since that does not happen automatically on cluster creation, we expect that the cluster will not have
// __share_group_state topic until yet.
assertFalse(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());

// consumer to consume records produced to the custom topic
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));

// consumer to consume records produced to the internal topic __share_group_state
KafkaConsumer<byte[], byte[]> internalConsumer = createConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
internalConsumer.subscribe(Collections.singleton(SHARE_GROUP_STATE_TOPIC_NAME));

try {
producer.send(record).get();
} catch (Exception e) {
fail("Failed to send records: " + e);
}

int numRecordsConsumed = 0;

while (numRecordsConsumed == 0) {
// First poll will initialise the share partition, which will internally try to read the state from the persister.
// Since this is the first persister related call, it will result in a lot of events happening in the background
// including creation of __share_group_state topic and loading of coordinator shards' in-memory states.
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(2000));
numRecordsConsumed += records.count();
}

assertEquals(1, numRecordsConsumed);

long duration = 1000; // 1 seconds
long startTime = System.currentTimeMillis();

// Poll for 1 second to ensure acknowledgement is sent for the fetched record
while (System.currentTimeMillis() - startTime < duration) {
shareConsumer.poll(Duration.ofMillis(2000));
}

topics = listTopics().names().get();
assertTrue(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

int numInternalRecordsConsumed = 0;

while (numInternalRecordsConsumed == 0) {
ConsumerRecords<byte[], byte[]> records = internalConsumer.poll(Duration.ofMillis(2000));
numInternalRecordsConsumed += records.count();
}

assertEquals(1, numInternalRecordsConsumed);

try {
producer.send(record).get();
} catch (Exception e) {
fail("Failed to send records: " + e);
}

numRecordsConsumed = 0;

while (numRecordsConsumed == 0) {
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(2000));
numRecordsConsumed += records.count();
}

assertEquals(1, numRecordsConsumed);

startTime = System.currentTimeMillis();

// Poll for 1 second to ensure acknowledgement is sent for the fetched record
while (System.currentTimeMillis() - startTime < duration) {
shareConsumer.poll(Duration.ofMillis(2000));
}

shareConsumer.close();

topics = listTopics().names().get();
assertTrue(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

numInternalRecordsConsumed = 0;

while (numInternalRecordsConsumed == 0) {
ConsumerRecords<byte[], byte[]> records = internalConsumer.poll(Duration.ofMillis(2000));
numInternalRecordsConsumed += records.count();
}
assertEquals(1, numInternalRecordsConsumed);

internalConsumer.close();
producer.close();
}

@Test
public void testReadShareGroupState() throws Exception {
Set<String> topics = listTopics().names().get();
// The __share_group_state topic is created on the first FIND_COORDINATOR RPC call for any share group topic partition.
// Since that does not happen automatically on cluster creation, we expect that the cluster will not have
// __share_group_state topic until yet.
assertFalse(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

String key1 = "key1";
String key2 = "key2";
String value1 = "value1";
String value2 = "value2";

ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), 0, null, key1.getBytes(), value1.getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), 0, null, key2.getBytes(), value2.getBytes());

KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());

// consumer to consume records produced to the custom topic
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));

try {
producer.send(record1).get();
} catch (Exception e) {
fail("Failed to send records: " + e);
}

int numRecordsConsumed = 0;
ConsumerRecords<byte[], byte[]> consumedRecords = null;

while (numRecordsConsumed == 0) {
// First poll will initialise the share partition, which will internally try to read the state from the persister.
// Since this is the first persister related call, it will result in a lot of events happening in the background
// including creation of __share_group_state topic and loading of coordinator shards' in-memory states.
consumedRecords = shareConsumer.poll(Duration.ofMillis(2000));
numRecordsConsumed += consumedRecords.count();
}

assertEquals(1, numRecordsConsumed);
ConsumerRecord<byte[], byte[]> consumedRecord = consumedRecords.records(tp).iterator().next();
assertEquals(new String(consumedRecord.key()), key1);
assertEquals(new String(consumedRecord.value()), value1);

long duration = 1000; // 1 seconds

long startTime = System.currentTimeMillis();

// Poll for 1 second to ensure acknowledgement is sent for the fetched record
while (System.currentTimeMillis() - startTime < duration) {
shareConsumer.poll(Duration.ofMillis(2000));
}

topics = listTopics().names().get();
assertTrue(topics.contains(SHARE_GROUP_STATE_TOPIC_NAME));

shareConsumer.close();
producer.close();

// closing the cluster to verify state persistence happens across broker restarts
destroyCluster();

// restarting the broker
createCluster();

producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());

shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));

try {
producer.send(record2).get();
} catch (Exception e) {
fail("Failed to send records: " + e);
}

numRecordsConsumed = 0;

while (numRecordsConsumed == 0) {
consumedRecords = shareConsumer.poll(Duration.ofMillis(2000));
numRecordsConsumed += consumedRecords.count();
}

// This time, only the second record should be consumed, even when the broker was shut down in between. This is because
// the share state was stored in the __share_group_state topic. When the cluster is spin up again
// and the consumer starts consuming again, it should read the state from the __share_group_state topic and continue
// only from the next not consumed offset.
assertEquals(1, numRecordsConsumed);
consumedRecord = consumedRecords.records(tp).iterator().next();
assertEquals(new String(consumedRecord.key()), key2);
assertEquals(new String(consumedRecord.value()), value2);

startTime = System.currentTimeMillis();

// Poll for 1 second to ensure acknowledgement is sent for the fetched record
while (System.currentTimeMillis() - startTime < duration) {
shareConsumer.poll(Duration.ofMillis(2000));
}

shareConsumer.close();
producer.close();
}

private CompletableFuture<Integer> produceMessages(int messageCount) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Future<?>[] recordFutures = new Future<?>[messageCount];
Expand Down Expand Up @@ -1566,6 +1808,18 @@ private void deleteTopic(String topicName) {
}
}

private ListTopicsResult listTopics() {
ListTopicsResult topics = null;
try (Admin admin = createAdminClient()) {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(true);
topics = admin.listTopics(listTopicsOptions);
} catch (Exception e) {
fail("Failed to list topics");
}
return topics;
}


private Admin createAdminClient() {
Properties props = cluster.clientProperties();
return Admin.create(props);
Expand Down Expand Up @@ -1602,4 +1856,16 @@ private <K, V> KafkaShareConsumer<K, V> createShareConsumer(Deserializer<K> keyD
props.putAll(additionalProperties);
return new KafkaShareConsumer<>(props, keyDeserializer, valueDeserializer);
}

private <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String groupId) {
Properties props = cluster.clientProperties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer");
return new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
}
}

0 comments on commit 235d43e

Please sign in to comment.