From 169f11896c3403cf31af5ecf8a61e4d11f9891de Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 28 Oct 2019 13:34:51 -0400 Subject: [PATCH] GH-1283: Unique client.id for each producer Resolves https://github.com/spring-projects/spring-kafka/issues/1283 Avoid `InstanceAlreadyExistsException` s when the user supplies a custom `client.id`. **cherry-pick to all supported branches** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java --- .../core/DefaultKafkaProducerFactory.java | 21 ++++++++++++++++++- .../core/KafkaTemplateTransactionTests.java | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index a197b0e2bb..f381b331c5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -102,6 +102,8 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private final Map> consumerProducers = new HashMap<>(); + private final AtomicInteger clientIdCounter = new AtomicInteger(); + private volatile CloseSafeProducer producer; private Serializer keySerializer; @@ -116,6 +118,7 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private boolean producerPerConsumerPartition = true; + private String clientIdPrefix; /** * Construct a factory with the provided configuration. * @param configs the configuration. @@ -133,9 +136,13 @@ public DefaultKafkaProducerFactory(Map configs) { public DefaultKafkaProducerFactory(Map configs, @Nullable Serializer keySerializer, @Nullable Serializer valueSerializer) { + this.configs = new HashMap<>(configs); this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; + if (configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) { + this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG); + } } @Override @@ -322,7 +329,15 @@ public Producer createProducer() { * @return the producer. */ protected Producer createKafkaProducer() { - return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer); + if (this.clientIdPrefix == null) { + return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer); + } + else { + Map newConfigs = new HashMap<>(this.configs); + newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, + this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); + return new KafkaProducer<>(newConfigs, this.keySerializer, this.valueSerializer); + } } Producer createTransactionalProducerForPartition() { @@ -376,6 +391,10 @@ private CloseSafeProducer doCreateTxProducer(String suffix, Consumer newProducer; Map newProducerConfigs = new HashMap<>(this.configs); newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix); + if (this.clientIdPrefix != null) { + newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, + this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); + } newProducer = new KafkaProducer(newProducerConfigs, this.keySerializer, this.valueSerializer); newProducer.initTransactions(); return new CloseSafeProducer(newProducer, this.cache, remover, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index ceda62da78..fc6815a729 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -105,6 +105,7 @@ public class KafkaTemplateTransactionTests { public void testLocalTransaction() throws Exception { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); + senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientId"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); pf.setTransactionIdPrefix("my.transaction.");