Skip to content

Commit

Permalink
GH-1441: Fix Sonar Issues
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Apr 13, 2020
1 parent d69c82f commit bc9fd8d
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,15 +502,15 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,

Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, prefix + suffix);
String txId = prefix + suffix;
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
if (this.clientIdPrefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
newProducer = createRawProducer(newProducerConfigs);
newProducer.initTransactions();
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout);
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover, txId, this.physicalCloseTimeout);
}

protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
Expand Down Expand Up @@ -601,7 +601,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
this(delegate, cache, removeConsumerProducer, null, closeTimeout);
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
Duration closeTimeout) {

Expand Down

0 comments on commit bc9fd8d

Please sign in to comment.