Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't get KafkaEmbedded's port when application context loads #70

Closed
YoannBuch opened this issue May 12, 2016 · 7 comments
Closed

Can't get KafkaEmbedded's port when application context loads #70

YoannBuch opened this issue May 12, 2016 · 7 comments

Comments

@YoannBuch
Copy link

As part of a Spring Integration test I want to use the EmbeddedKafka:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(...);

The problem is that I can't figure out how to get the random port assigned to the broker and use it from the Java configuration:

@Configuration
public class KafkaConsumerConfiguration {

    @Value("${kafka.broker.port}")
    private String kafkaBrokerPort;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort);
        ...
        return new DefaultKafkaConsumerFactory<>(props);
    }

The example given in the documentation (http://docs.spring.io/spring-kafka/docs/1.0.0.M2/reference/htmlsingle/#kafka) doesn't use an application context but rather instantiates everything from the test.

And in the following test the configuration was embedded in the test class so the configuration could easily access the instance of EmbeddedKafka: https://github.com/spring-projects/spring-kafka/blob/48845e1de043ec9caf6493c7e2320966e7986a9f/spring-kafka%2Fsrc%2Ftest%2Fjava%2Forg%2Fspringframework%2Fkafka%2Fannotation%2FEnableKafkaIntegrationTests.java. But we can't really do that in a "real" application.

My problem is similar to what this person has with the property server.port: http://stackoverflow.com/questions/31058489/override-default-spring-boot-application-properties-settings-in-junit-test-with

I can't think of a good solution right now. I'm not really familiar with the execution order of the @ClassRule and the test context. In any case the test context can be cached across test suites so I don't see how we could inject a dynamic port.

The only solution I see is giving an option to assign a static port to the EmbeddedKafka. I don't think it's too bad in a test environment.

What do you think?

@garyrussell
Copy link
Contributor

garyrussell commented May 12, 2016

Here's one solution...

/*
 * Copyright 2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.Ordered;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestContext;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.TestExecutionListeners.MergeMode;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AbstractTestExecutionListener;

/**
 * @author Gary Russell
 * @since 1.0
 *
 */
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@TestExecutionListeners(mergeMode = MergeMode.MERGE_WITH_DEFAULTS,
    listeners = SpringWithEmbeddedTests.KafkaEmbeddedTestExecutionListener.class)
public class SpringWithEmbeddedTests {

    private static final String KAFKA_BROKER_ADDRESSES = "kafka.broker.addresses";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "foo");

    @Autowired
    private ConsumerFactory<?, ?> consumerFactory;

    @Test
    public void test() {
        DirectFieldAccessor accessor = new DirectFieldAccessor(this.consumerFactory);
        @SuppressWarnings("unchecked")
        Map<String, Object> configs = (Map<String, Object>) accessor.getPropertyValue("configs");
        assertThat(configs.get("bootstrap.servers")).isEqualTo(embeddedKafka.getBrokersAsString());
    }

    public static class KafkaEmbeddedTestExecutionListener extends AbstractTestExecutionListener {

        @Override
        public void beforeTestClass(TestContext testContext) throws Exception {
            System.setProperty(KAFKA_BROKER_ADDRESSES, embeddedKafka.getBrokersAsString());
        }

        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }


    }

    @Configuration
    public static class Config {

        @Value("${" + KAFKA_BROKER_ADDRESSES + "}")
        private String brokers;

        @Bean
        public static PropertySourcesPlaceholderConfigurer configurer() {
            return new PropertySourcesPlaceholderConfigurer();
        }

        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            // ...
            return new DefaultKafkaConsumerFactory<>(props);
        }

    }

}

@garyrussell
Copy link
Contributor

Using a fixed port is probably ok locally but could be a problem on a CI build server.

@artembilan
Copy link
Member

M-m-m. The sample from existing test-case:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, ...);
...
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testAnnot", "true", embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return consumerProps;
}

where KafkaTestUtils.consumerProps() does exactly what you ask:

return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
...
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

@garyrussell
Copy link
Contributor

garyrussell commented May 12, 2016

@artembilan I believe the use case is for testing a production @Configuration class that is not embedded within the test case - i.e. doesn't (and must not) have visibility to the embeddedKafka static field.

@artembilan
Copy link
Member

Oh! Sorry, my fault do not read the question.

Interesting, this works for me well:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, ...);

@BeforeClass
public static void setup() {
    System.setProperty("kafka.brokers", embeddedKafka.getBrokersAsString());
}
...
@Configuraiton
....
    @Value("${kafka.brokers}")
    private String brokers;

@YoannBuch
Copy link
Author

Thank you Gary!! Pretty cool trick :)

@garyrussell
Copy link
Contributor

@artembilan 's solution is certainly simpler 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants