The spring-kafka-test
jar contains some useful utilities to assist with testing your applications.
o.s.kafka.test.utils.KafkaTestUtils
provides some static methods to set up producer and consumer properties.
The following listing shows those method signatures:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
Note
|
Starting with version 2.5, the consumerProps method sets the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest .
This is because, in most cases, you want the consumer to consume any messages sent in a test case.
The ConsumerConfig default is latest which means that messages already sent by a test, before the consumer starts, will not receive those records.
To revert to the previous behavior, set the property to latest after calling the method.
|
A JUnit 4 @Rule
wrapper for the EmbeddedKafkaBroker
is provided to create an embedded Kafka and an embedded Zookeeper server.
(See @EmbeddedKafka Annotation for information about using @EmbeddedKafka
with JUnit 5).
The following listing shows the signatures of those methods:
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
The EmbeddedKafkaBroker
class has a utility method that lets you consume for all the topics it created.
The following example shows how to use it:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
The KafkaTestUtils
has some utility methods to fetch results from the consumer.
The following listing shows those method signatures:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
The following example shows how to use KafkaTestUtils
:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
When the embedded Kafka and embedded Zookeeper server are started by the EmbeddedKafkaBroker
, a system property named spring.embedded.kafka.brokers
is set to the address of the Kafka brokers and a system property named spring.embedded.zookeeper.connect
is set to the address of Zookeeper.
Convenient constants (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
and EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
) are provided for this property.
With the EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
, you can provide additional properties for the Kafka servers.
See Kafka Config for more information about possible broker properties.
The following example configuration creates topics called cat
and hat
with five partitions, a topic called thing1
with 10 partitions, and a topic called thing2
with 15 partitions:
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
By default, addTopics
will throw an exception when problems arise (such as adding a topic that already exists).
Version 2.6 added a new version of that method that returns a Map<String, Exception>
; the key is the topic name and the value is null
for success, or an Exception
for a failure.
There is no built-in support for doing so, but you can use the same broker for multiple test classes with something similar to the following:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false);
private static boolean started;
public static EmbeddedKafkaRule getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.before();
}
catch (Exception e) {
throw new KafkaException(e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
Then, in each test class, you can use something similar to the following:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics(topic1, topic2);
}
private static EmbeddedKafkaRule embeddedKafka = EmbeddedKafkaHolder.getEmbeddedKafka();
Important
|
The preceding example provides no mechanism for shutting down the brokers when all tests are complete.
This could be a problem if, say, you run your tests in a Gradle daemon.
You should not use this technique in such a situation, or you should use something to call destroy() on the EmbeddedKafkaBroker when your tests are complete.
|
We generally recommend that you use the rule as a @ClassRule
to avoid starting and stopping the broker between tests (and use a different topic for each test).
Starting with version 2.0, if you use Spring’s test application context caching, you can also declare a EmbeddedKafkaBroker
bean, so a single broker can be used across multiple test classes.
For convenience, we provide a test class-level annotation called @EmbeddedKafka
to register the EmbeddedKafkaBroker
bean.
The following example shows how to use it:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
Starting with version 2.2.4, you can also use the @EmbeddedKafka
annotation to specify the Kafka ports property.
The following example sets the topics
, brokerProperties
, and brokerPropertiesLocation
attributes of @EmbeddedKafka
support property placeholder resolutions:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
In the preceding example, the property placeholders ${kafka.topics.another-topic}
, ${kafka.broker.logs-dir}
, and ${kafka.broker.port}
are resolved from the Spring Environment
.
In addition, the broker properties are loaded from the broker.properties
classpath resource specified by the brokerPropertiesLocation
.
Property placeholders are resolved for the brokerPropertiesLocation
URL and for any property placeholders found in the resource.
Properties defined by brokerProperties
override properties found in brokerPropertiesLocation
.
You can use the @EmbeddedKafka
annotation with JUnit 4 or JUnit 5.
Starting with version 2.3, there are two ways to use the @EmbeddedKafka
annotation with JUnit5.
When used with the @SpringJunitConfig
annotation, the embedded broker is added to the test application context.
You can auto wire the broker into your test, at the class or method level, to get the broker address list.
When not using the spring test context, the EmbdeddedKafkaCondition
creates a broker; the condition includes a parameter resolver so you can access the broker in your test method…
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
A stand-alone (not Spring test context) broker will be created if the class annotated with @EmbeddedBroker
is not also annotated (or meta annotated) with ExtendedWith(SpringExtension.class)
.
@SpringJunitConfig
and @SpringBootTest
are so meta annotated and the context-based broker will be used when either of those annotations are also present.
Important
|
When there is a Spring test application context available, the topics and broker properties can contain property placeholders, which will be resolved as long as the property is defined somewhere. If there is no Spring context available, these placeholders won’t be resolved. |
Spring Initializr now automatically adds the spring-kafka-test
dependency in test scope to the project configuration.
Important
|
If your application uses the Kafka binder in @RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
+ "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
...
} |
There are several ways to use an embedded broker in a Spring Boot application test.
They include:
The following example shows how to use a JUnit4 class rule to create an embedded broker:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
Notice that, since this is a Spring Boot application, we override the broker list property to set Boot’s property.
The following example shows how to use an @EmbeddedKafka
Annotation to create an embedded broker:
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
The o.s.kafka.test.hamcrest.KafkaMatchers
provides the following matchers:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
You can use the following AssertJ conditions:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
The following example brings together most of the topics covered in this chapter:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
The preceding example uses the Hamcrest matchers.
With AssertJ
, the final part looks like the following code:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));