Skip to content

Commit

Permalink
Embedded Kafka SeekToBeginning if needed
Browse files Browse the repository at this point in the history
If the initial `poll()` used to force the subscription returns
any records, seek those partitions to beginning instead of
discarding.
  • Loading branch information
garyrussell authored and artembilan committed Feb 14, 2018
1 parent 74e5956 commit 655cbee
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.ExternalResource;
Expand Down Expand Up @@ -410,7 +411,18 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

});
consumer.poll(0); // force assignment
ConsumerRecords<?, ?> records = consumer.poll(0); // force assignment
if (records.count() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Records received on initial poll for assignment; re-seeking to beginning; "
+ records.partitions().stream()
.flatMap(p -> records.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.collect(Collectors.toList()));
}
consumer.seekToBeginning(records.partitions());
}
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
.as("Failed to be assigned partitions from the embedded topics")
.isTrue();
Expand Down
Expand Up @@ -20,15 +20,23 @@

import java.io.IOException;
import java.net.ServerSocket;
import java.util.Map;

import javax.net.ServerSocketFactory;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;

/**
Expand All @@ -41,6 +49,8 @@
@RunWith(SpringRunner.class)
public class AddressableEmbeddedBrokerTests {

private static final String TEST_EMBEDDED = "testEmbedded";

@Autowired
private Config config;

Expand All @@ -56,14 +66,36 @@ public void testKafkaEmbedded() {
.isEqualTo(System.getProperty(KafkaEmbedded.SPRING_EMBEDDED_ZOOKEEPER_CONNECT));
}

@Test
public void testLateStartedConsumer() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);

Producer<String, Object> producer = new KafkaProducer<>(KafkaTestUtils.producerProps(this.broker));
producer.send(new ProducerRecord<String, Object>(TEST_EMBEDDED, "foo"));
producer.close();
KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);

consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);

consumer.close();
consumer2.close();
}

@Configuration
public static class Config {

private int port;

@Bean
public KafkaEmbedded broker() throws IOException {
KafkaEmbedded broker = new KafkaEmbedded(1);
KafkaEmbedded broker = new KafkaEmbedded(1, true, TEST_EMBEDDED);
ServerSocket ss = ServerSocketFactory.getDefault().createServerSocket(0);
this.port = ss.getLocalPort();
ss.close();
Expand Down

0 comments on commit 655cbee

Please sign in to comment.