Skip to content

Commit

Permalink
GH-1778: Properly Apply Client Post Processors
Browse files Browse the repository at this point in the history
Resolves #1778

`Consumer`/`Producer` post processors are `Function`s and the result should
be used; it was previously discarded.

**cherry-pick to 2.6.x, 2.5.x**
  • Loading branch information
garyrussell authored and artembilan committed Apr 21, 2021
1 parent 813e104 commit ee82876
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
}
}
for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
pp.apply(kafkaConsumer);
kafkaConsumer = pp.apply(kafkaConsumer);
}
return kafkaConsumer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,11 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
}

protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
KafkaProducer<K, V> kafkaProducer =
Producer<K, V> kafkaProducer =
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
kafkaProducer = pp.apply(kafkaProducer);
}
return kafkaProducer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -42,6 +42,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -335,10 +336,14 @@ public void testNestedTxProducerIsCached() throws Exception {
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
AtomicBoolean ppCalled = new AtomicBoolean();
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
cf.addPostProcessor(consumer -> {
ppCalled.set(true);
return consumer;
ProxyFactory prox = new ProxyFactory();
prox.setTarget(consumer);
@SuppressWarnings("unchecked")
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
wrapped.set(proxy);
return proxy;
});
ContainerProperties containerProps = new ContainerProperties("txCache1");
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -360,13 +365,13 @@ public void testNestedTxProducerIsCached() throws Exception {
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
assertThat(pfTx.getCache()).hasSize(1);
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
}
finally {
container.stop();
pf.destroy();
pfTx.destroy();
}
assertThat(ppCalled.get()).isTrue();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,6 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand All @@ -64,6 +63,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.CompositeProducerListener;
Expand Down Expand Up @@ -120,10 +120,14 @@ public static void tearDown() {
void testTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
AtomicBoolean ppCalled = new AtomicBoolean();
AtomicReference<Producer<Integer, String>> wrapped = new AtomicReference<>();
pf.addPostProcessor(prod -> {
ppCalled.set(true);
return prod;
ProxyFactory prox = new ProxyFactory();
prox.setTarget(prod);
@SuppressWarnings("unchecked")
Producer<Integer, String> proxy = (Producer<Integer, String>) prox.getProxy();
wrapped.set(proxy);
return proxy;
});
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);

Expand Down Expand Up @@ -166,8 +170,8 @@ void testTemplate() {
List<PartitionInfo> partitions = template.partitionsFor(INT_KEY_TOPIC);
assertThat(partitions).isNotNull();
assertThat(partitions).hasSize(2);
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
pf.destroy();
assertThat(ppCalled.get()).isTrue();
}

@Test
Expand Down

0 comments on commit ee82876

Please sign in to comment.