Skip to content

Commit

Permalink
GH-917: Specify the port with @embeddedkafka
Browse files Browse the repository at this point in the history
Fixes #917

Added ports attribute to EmbeddedKafka annotation.
It has a default value of 0 to be consistent with the count attribute.

* #917: Added Tests and update adoc files

Created a unit test similarly to the EmbeddedKafkaBroker test for validating the system properties created with the EmbeddedKafka bean creation.

Updated copyright and author/since in relevant files.

Updated .adoc files to reflect the new attribute.

* #917 Corrected since annotations and Unit tests

- Corrected since annotations from last commit
- Moved unit test of EmbeddedKafka annotation from spring-kafka to spring-kafka-test. Refactored unit test to avoid bean initialization and broker creation.

* #917 Corrected white space trailling errors

* #917 Corrected style for spring-kafka-test

* #917 Corrected checkstyle
  • Loading branch information
SLourenco authored and artembilan committed Jan 15, 2019
1 parent 25151c0 commit 4c6d6f5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 7 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@
* @author Elliot Metsger
* @author Zach Olauson
* @author Gary Russell
* @author Sergio Lourenco
*
* @since 1.3
*
Expand Down Expand Up @@ -81,6 +82,15 @@
*/
boolean controlledShutdown() default false;

/**
* Set explicit ports on which the kafka brokers will listen. Useful when running an
* embedded broker that you want to access from other processes.
* A port must be provided for each instance, which means the number of ports must match the value of the count attribute.
* @return ports for brokers.
* @since 2.2.4
*/
int[] ports() default {0};

/**
* @return partitions per topic
*/
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@
* @author Elliot Metsger
* @author Zach Olauson
* @author Oleg Artyomov
* @author Sergio Lourenco
*
* @since 1.3
*/
Expand Down Expand Up @@ -70,6 +71,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
this.embeddedKafka.partitions(),
topics);

embeddedKafkaBroker.kafkaPorts(this.embeddedKafka.ports());

Properties properties = new Properties();

for (String pair : this.embeddedKafka.brokerProperties()) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,25 +17,32 @@
package org.springframework.kafka.test.context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;

import org.junit.Before;
import org.junit.Test;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.test.EmbeddedKafkaBroker;



/**
* @author Oleg Artyomov
* @author Sergio Lourenco
* @since 1.3
*/

public class EmbeddedKafkaContextCustomizerTests {

private EmbeddedKafka annotationFromFirstClass;
private EmbeddedKafka annotationFromSecondClass;


@Before
public void beforeEachTest() {
annotationFromFirstClass = AnnotationUtils.findAnnotation(TestWithEmbeddedKafka.class, EmbeddedKafka.class);
Expand All @@ -56,6 +63,19 @@ public void testEquals() {
assertThat(new EmbeddedKafkaContextCustomizer(annotationFromFirstClass)).isNotEqualTo(new Object());
}

@Test
public void testPorts() {
EmbeddedKafka annotationWithPorts = AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaPorts.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
BeanFactoryStub factoryStub = new BeanFactoryStub();
given(context.getBeanFactory()).willReturn(factoryStub);
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
customizer.customizeContext(context, null);

assertThat(factoryStub.getBroker().getBrokersAsString()).isEqualTo("127.0.0.1:" + annotationWithPorts.ports()[0]);
}


@EmbeddedKafka
private class TestWithEmbeddedKafka {
Expand All @@ -67,4 +87,32 @@ private class SecondTestWithEmbeddedKafka {

}

@EmbeddedKafka(ports = 8085)
private class TestWithEmbeddedKafkaPorts {

}

private class BeanFactoryStub extends DefaultListableBeanFactory {
private Object bean;

public EmbeddedKafkaBroker getBroker() {
return (EmbeddedKafkaBroker) bean;
}

@Override
public Object initializeBean(Object existingBean, String beanName) throws BeansException {
this.bean = existingBean;
return bean;
}

@Override
public void registerSingleton(String beanName, Object singletonObject) {

}

@Override
public void registerDisposableBean(String beanName, DisposableBean bean) {

}
}
}
Expand Up @@ -46,7 +46,6 @@
/**
* @author Gary Russell
* @since 1.0.6
*
*/
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
brokerProperties = {
Expand Down
3 changes: 2 additions & 1 deletion src/reference/asciidoc/testing.adoc
Expand Up @@ -224,8 +224,9 @@ public class KafkaStreamsTests {
}
----
Starting with version 2.2.4, the `@EmbeddedKafka` annotation can also be used to specify the kafka ports property.

The `topics`, `brokerProperties`, and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:
The `topics`, `brokerProperties` and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:
[source, java]
----
@TestPropertySource(locations = "classpath:/test.properties")
Expand Down
1 change: 1 addition & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -72,6 +72,7 @@ See <<headers>> for more information.
The `KafkaEmbedded` class and its `KafkaRule` interface have need deprecated in favor of the `EmbeddedKafkaBroker` and its JUnit 4 `EmbeddedKafkaRule` wrapper.
The `@EmbeddedKafka` annotation now populates an `EmbeddedKafkaBroker` bean instead of the deprecated `KafkaEmbedded`.
This allows the use of `@EmbeddedKafka` in JUnit 5 tests.
The `@EmbeddedKafka` annotation now has the attribute `ports` to specify the port which will populate the `EmbeddedKafkaBroker`.

See <<testing>> for more information.

Expand Down

0 comments on commit 4c6d6f5

Please sign in to comment.