Skip to content

Commit

Permalink
GH-1283: Unique client.id for each producer
Browse files Browse the repository at this point in the history
Resolves #1283

Avoid `InstanceAlreadyExistsException` s when the user supplies a custom
`client.id`.

**cherry-pick to all supported branches**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
#	spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
  • Loading branch information
garyrussell authored and artembilan committed Oct 28, 2019
1 parent 70c2258 commit 169f118
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Expand Up @@ -102,6 +102,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();

private final AtomicInteger clientIdCounter = new AtomicInteger();

private volatile CloseSafeProducer<K, V> producer;

private Serializer<K> keySerializer;
Expand All @@ -116,6 +118,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private boolean producerPerConsumerPartition = true;

private String clientIdPrefix;
/**
* Construct a factory with the provided configuration.
* @param configs the configuration.
Expand All @@ -133,9 +136,13 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
public DefaultKafkaProducerFactory(Map<String, Object> configs,
@Nullable Serializer<K> keySerializer,
@Nullable Serializer<V> valueSerializer) {

this.configs = new HashMap<>(configs);
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
if (configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
}
}

@Override
Expand Down Expand Up @@ -322,7 +329,15 @@ public Producer<K, V> createProducer() {
* @return the producer.
*/
protected Producer<K, V> createKafkaProducer() {
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
if (this.clientIdPrefix == null) {
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
}
else {
Map<String, Object> newConfigs = new HashMap<>(this.configs);
newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
return new KafkaProducer<>(newConfigs, this.keySerializer, this.valueSerializer);
}
}

Producer<K, V> createTransactionalProducerForPartition() {
Expand Down Expand Up @@ -376,6 +391,10 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<Close
Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
if (this.clientIdPrefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
newProducer.initTransactions();
return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
Expand Down
Expand Up @@ -105,6 +105,7 @@ public class KafkaTemplateTransactionTests {
public void testLocalTransaction() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientId");
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setKeySerializer(new StringSerializer());
pf.setTransactionIdPrefix("my.transaction.");
Expand Down

0 comments on commit 169f118

Please sign in to comment.