diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index c4b19a14e7..9fd3eef1df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -342,7 +342,7 @@ protected Consumer createKafkaConsumer(Map configProps) { } } for (ConsumerPostProcessor pp : this.postProcessors) { - pp.apply(kafkaConsumer); + kafkaConsumer = pp.apply(kafkaConsumer); } return kafkaConsumer; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 8adf37de9c..5f92932f9a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -739,9 +739,11 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, } protected Producer createRawProducer(Map rawConfigs) { - KafkaProducer kafkaProducer = + Producer kafkaProducer = new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get()); - this.postProcessors.forEach(pp -> pp.apply(kafkaProducer)); + for (ProducerPostProcessor pp : this.postProcessors) { + kafkaProducer = pp.apply(kafkaProducer); + } return kafkaProducer; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index cf484951e0..045c5ed066 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -29,7 +29,7 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -335,10 +336,14 @@ public void testNestedTxProducerIsCached() throws Exception { KafkaTemplate templateTx = new KafkaTemplate<>(pfTx); Map consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); - AtomicBoolean ppCalled = new AtomicBoolean(); + AtomicReference> wrapped = new AtomicReference<>(); cf.addPostProcessor(consumer -> { - ppCalled.set(true); - return consumer; + ProxyFactory prox = new ProxyFactory(); + prox.setTarget(consumer); + @SuppressWarnings("unchecked") + Consumer proxy = (Consumer) prox.getProxy(); + wrapped.set(proxy); + return proxy; }); ContainerProperties containerProps = new ContainerProperties("txCache1"); CountDownLatch latch = new CountDownLatch(1); @@ -360,13 +365,13 @@ public void testNestedTxProducerIsCached() throws Exception { assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); assertThat(pfTx.getCache()).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get()); } finally { container.stop(); pf.destroy(); pfTx.destroy(); } - assertThat(ppCalled.get()).isTrue(); } @SuppressWarnings("unchecked") diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 6a20a280f0..d31c5653ea 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,6 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -64,6 +63,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.CompositeProducerListener; @@ -120,10 +120,14 @@ public static void tearDown() { void testTemplate() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); - AtomicBoolean ppCalled = new AtomicBoolean(); + AtomicReference> wrapped = new AtomicReference<>(); pf.addPostProcessor(prod -> { - ppCalled.set(true); - return prod; + ProxyFactory prox = new ProxyFactory(); + prox.setTarget(prod); + @SuppressWarnings("unchecked") + Producer proxy = (Producer) prox.getProxy(); + wrapped.set(proxy); + return proxy; }); KafkaTemplate template = new KafkaTemplate<>(pf, true); @@ -166,8 +170,8 @@ void testTemplate() { List partitions = template.partitionsFor(INT_KEY_TOPIC); assertThat(partitions).isNotNull(); assertThat(partitions).hasSize(2); + assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get()); pf.destroy(); - assertThat(ppCalled.get()).isTrue(); } @Test