Skip to content

Commit

Permalink
GH-1059: Add SpEL support for KafkaListener.id()
Browse files Browse the repository at this point in the history
Fixes #1059

* Code style polishing in the `KafkaListenerAnnotationBeanPostProcessor`
 and `EnableKafkaIntegrationTests`

**Cherry-pick to 2.2.x**
  • Loading branch information
artembilan authored and garyrussell committed Apr 11, 2019
1 parent 95f7c67 commit 04f4db3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 42 deletions.
Expand Up @@ -277,14 +277,9 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

@Override
public Set<KafkaListener> inspect(Method method) {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}

(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
Expand Down Expand Up @@ -427,8 +422,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(),
"clientIdPrefix"));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
Expand Down Expand Up @@ -475,11 +469,14 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint,
if (propertyStrings.length > 0) {
Properties properties = new Properties();
for (String property : propertyStrings) {
try {
properties.load(new StringReader(resolveExpressionAsString(property, "property")));
}
catch (IOException e) {
this.logger.error("Failed to load property " + property + ", continuing...", e);
String value = resolveExpressionAsString(property, "property");
if (value != null) {
try {
properties.load(new StringReader(value));
}
catch (IOException e) {
this.logger.error("Failed to load property " + property + ", continuing...", e);
}
}
}
endpoint.setConsumerProperties(properties);
Expand All @@ -488,7 +485,7 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint,

private String getEndpointId(KafkaListener kafkaListener) {
if (StringUtils.hasText(kafkaListener.id())) {
return resolve(kafkaListener.id());
return resolveExpressionAsString(kafkaListener.id(), "id");
}
else {
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
Expand All @@ -514,19 +511,19 @@ private TopicPartitionInitialOffset[] resolveTopicPartitions(KafkaListener kafka
result.addAll(resolveTopicPartitionsList(topicPartition));
}
}
return result.toArray(new TopicPartitionInitialOffset[result.size()]);
return result.toArray(new TopicPartitionInitialOffset[0]);
}

private String[] resolveTopics(KafkaListener kafkaListener) {
String[] topics = kafkaListener.topics();
List<String> result = new ArrayList<>();
if (topics.length > 0) {
for (int i = 0; i < topics.length; i++) {
Object topic = resolveExpression(topics[i]);
for (String topic1 : topics) {
Object topic = resolveExpression(topic1);
resolveAsString(topic, result);
}
}
return result.toArray(new String[result.size()]);
return result.toArray(new String[0]);
}

private Pattern resolvePattern(KafkaListener kafkaListener) {
Expand Down Expand Up @@ -558,8 +555,8 @@ private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartit
Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionInitialOffset> result = new ArrayList<>();
for (int i = 0; i < partitions.length; i++) {
resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}

for (PartitionOffset partitionOffset : partitionOffsets) {
Expand Down
Expand Up @@ -17,7 +17,7 @@
package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
Expand Down Expand Up @@ -255,7 +255,7 @@ public void testSimple() throws Exception {
"listenerConsumer.consumer"));
assertThat(
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, "fetcher.maxPollRecords", Integer.class))
.isEqualTo(100);
.isEqualTo(100);
assertThat(this.quxGroup).hasSize(1);
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
Expand Down Expand Up @@ -309,14 +309,12 @@ public void testSimple() throws Exception {
.isNotEqualTo("rebalanceListener");
String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId",
String.class);
assertThat(
clientId)
.startsWith("rebal-");
assertThat(clientId).startsWith("rebal-");
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
}

@Test
public void testAutoStartup() throws Exception {
public void testAutoStartup() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("manualStart");
assertThat(listenerContainer).isNotNull();
assertThat(listenerContainer.isRunning()).isFalse();
Expand Down Expand Up @@ -399,7 +397,6 @@ public void testJson() throws Exception {
}

@Test
@DirtiesContext
public void testJsonHeaders() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container =
(ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer("jsonHeaders");
Expand Down Expand Up @@ -546,7 +543,7 @@ public void testValidation() throws Exception {
}

@Test
public void testReplyingListener() throws Exception {
public void testReplyingListener() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -560,7 +557,7 @@ public void testReplyingListener() throws Exception {
}

@Test
public void testReplyingBatchListener() throws Exception {
public void testReplyingBatchListener() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testBatchReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -586,7 +583,7 @@ public void testReplyingBatchListener() throws Exception {
}

@Test
public void testReplyingListenerWithErrorHandler() throws Exception {
public void testReplyingListenerWithErrorHandler() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -600,7 +597,7 @@ public void testReplyingListenerWithErrorHandler() throws Exception {
}

@Test
public void testVoidListenerWithReplyingErrorHandler() throws Exception {
public void testVoidListenerWithReplyingErrorHandler() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testVoidWithErrorHandlerReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -614,7 +611,7 @@ public void testVoidListenerWithReplyingErrorHandler() throws Exception {
}

@Test
public void testReplyingBatchListenerWithErrorHandler() throws Exception {
public void testReplyingBatchListenerWithErrorHandler() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerBatchReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -640,7 +637,7 @@ public void testReplyingBatchListenerWithErrorHandler() throws Exception {
}

@Test
public void testMultiReplyTo() throws Exception {
public void testMultiReplyTo() {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand Down Expand Up @@ -709,10 +706,12 @@ public void testAddingTopics() {
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2);
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists");
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication");
assertThatIllegalArgumentException()
.isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
.withMessageContaining("exists");
assertThatIllegalArgumentException()
.isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
.withMessageContaining("replication");
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand All @@ -733,7 +732,7 @@ public void testReceivePollResults() throws Exception {
@Test
public void testAutoConfigTm() {
assertThat(this.transactionalFactory.getContainerProperties().getTransactionManager())
.isInstanceOf(ChainedKafkaTransactionManager.class);
.isInstanceOf(ChainedKafkaTransactionManager.class);
}

@Test
Expand Down Expand Up @@ -1416,8 +1415,9 @@ public void listen3(ConsumerRecord<?, ?> record) {
this.latch3.countDown();
}

@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
containerGroup = "qux#{'Group'}", properties = {
@KafkaListener(id = "#{'qux'}", topics = "annotated4",
containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}",
properties = {
"max.poll.interval.ms:#{'${poll.interval:60000}'}",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=#{'${poll.recs:100}'}"
})
Expand Down Expand Up @@ -1879,6 +1879,7 @@ public void setDelegate(
public Foo convert(String source) {
return delegate.convert(source);
}

}

public static class ValidatedClass {
Expand Down

0 comments on commit 04f4db3

Please sign in to comment.