Skip to content

Commit

Permalink
GH-661: Close Producer after beginTransaction fail
Browse files Browse the repository at this point in the history
Fixes #661

I could not reproduce the problem that I reported in the issue; so, if any exception occurs
on `beginTransaction()`, throw the exception to the caller after closing the
producer and prevent its return to the cache.

Also, reading the javadocs for `ProducerFencedException`, we should have been closing
the producer if that exception occurred anyway.

```
/**
 * This fatal exception indicates that another producer with the same <code>transactional.id</code> has been
 * started. It is only possible to have one producer instance with a <code>transactional.id</code> at any
 * given time, and the latest one to be started "fences" the previous instances so that they can no longer
 * make transactional requests. When you encounter this exception, you must close the producer instance.
 */
```

Cherry-pick to master, 2.0.x, 1.3.x.

(cherry picked from commit f48ad87)
  • Loading branch information
garyrussell authored and artembilan committed Apr 23, 2018
1 parent 83e3a45 commit bfccd92
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 4 deletions.
Expand Up @@ -216,10 +216,21 @@ public Producer<K, V> createProducer() {
return this.producer;
}

/**
* Subclasses must return a raw producer which will be wrapped in a
* {@link CloseSafeProducer}.
* @return the producer.
*/
protected Producer<K, V> createKafkaProducer() {
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
}

/**
* Subclasses must return a producer from the {@link #getCache()} or a
* new raw producer wrapped in a {@link CloseSafeProducer}.
* @return the producer - cannot be null.
* @since 1.3
*/
protected Producer<K, V> createTransactionalProducer() {
Producer<K, V> producer = this.cache.poll();
if (producer == null) {
Expand All @@ -235,14 +246,28 @@ protected Producer<K, V> createTransactionalProducer() {
}
}

private static class CloseSafeProducer<K, V> implements Producer<K, V> {
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
return this.cache;
}

/**
* A wrapper class for the delegate.
*
* @param <K> the key type.
* @param <V> the value type.
*
*/
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

private final Producer<K, V> delegate;

private final BlockingQueue<CloseSafeProducer<K, V>> cache;

private volatile boolean txFailed;

CloseSafeProducer(Producer<K, V> delegate) {
this(delegate, null);
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
Expand Down Expand Up @@ -282,7 +307,21 @@ public void initTransactions() {

@Override
public void beginTransaction() throws ProducerFencedException {
this.delegate.beginTransaction();
try {
this.delegate.beginTransaction();
}
catch (RuntimeException e) {
this.txFailed = true;
logger.error("Illegal transaction state; producer removed from cache; possible cause: "
+ "broker restarted during transaction", e);
try {
this.delegate.close();
}
catch (Exception ee) {
// empty
}
throw e;
}
}

@Override
Expand All @@ -303,7 +342,7 @@ public void abortTransaction() throws ProducerFencedException {

@Override
public void close() {
if (this.cache != null) {
if (this.cache != null && !this.txFailed) {
synchronized (this) {
if (!this.cache.contains(this)) {
this.cache.offer(this);
Expand Down
Expand Up @@ -258,8 +258,8 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
Producer<K, V> producer = this.producers.get();
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
producer = this.producerFactory.createProducer();
this.producers.set(producer);
producer.beginTransaction();
this.producers.set(producer);
T result = null;
try {
result = callback.doInOperations(this);
Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.support.TransactionTemplate;

/**
* @author Gary Russell
* @since 1.3.5
*
*/
public class DefaultKafkaProducerFactoryTests {

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerClosedAfterBadTransition() throws Exception {
final Producer producer = mock(Producer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createTransactionalProducer() {
producer.initTransactions();
BlockingQueue<Producer> cache = getCache();
Producer cached = cache.poll();
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
}

};
pf.setTransactionIdPrefix("foo");

final AtomicInteger flag = new AtomicInteger();
willAnswer(i -> {
if (flag.incrementAndGet() == 2) {
throw new KafkaException("Invalid transition ...");
}
return null;
}).given(producer).beginTransaction();

final KafkaTemplate kafkaTemplate = new KafkaTemplate(pf);
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
TransactionTemplate transactionTemplate = new TransactionTemplate(tm);
transactionTemplate.execute(s -> {
kafkaTemplate.send("foo", "bar");
return null;
});
BlockingQueue cache = KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class);
assertThat(cache).hasSize(1);
try {
transactionTemplate.execute(s -> {
return null;
});
}
catch (CannotCreateTransactionException e) {
assertThat(e.getCause().getMessage()).contains("Invalid transition");
}
assertThat(cache).hasSize(0);

InOrder inOrder = inOrder(producer);
inOrder.verify(producer).initTransactions();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).send(any(), any());
inOrder.verify(producer).commitTransaction();
inOrder.verify(producer).beginTransaction();
inOrder.verify(producer).close();
inOrder.verifyNoMoreInteractions();
pf.destroy();
}

}

0 comments on commit bfccd92

Please sign in to comment.