Skip to content

Commit

Permalink
Add BeanNameAware to Producer/Consumer Factories
Browse files Browse the repository at this point in the history
Resolves spring-cloud/spring-cloud-stream-binder-kafka#930

Re-enable Hoxton to be used with Spring Boot 2.2.x.
  • Loading branch information
garyrussell committed Jul 7, 2020
1 parent db306bf commit 0ddff6d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -57,7 +58,7 @@
* @author Artem Bilan
* @author Chris Gilbert
*/
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V>, BeanNameAware {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));

Expand Down Expand Up @@ -104,6 +105,10 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
this.valueDeserializerSupplier = valueDeserializerSupplier == null ? () -> null : valueDeserializerSupplier;
}

@Override
public void setBeanName(String name) {
}

public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
this.keyDeserializerSupplier = () -> keyDeserializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.common.serialization.Serializer;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -107,7 +108,7 @@
* @author Chris Gilbert
*/
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, ApplicationContextAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean {
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {

/**
* The default close timeout duration as 30 seconds.
Expand Down Expand Up @@ -208,6 +209,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void setBeanName(String name) {
}

public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
this.keySerializerSupplier = () -> keySerializer;
Expand Down

0 comments on commit 0ddff6d

Please sign in to comment.