Skip to content

Commit

Permalink
GH-753: Close transactional producer on error
Browse files Browse the repository at this point in the history
Fixes #753

Improve exception handling for producer transaction commit / rollback

* Close the producer if an exception is thrown while committing / rollbacking
  a transaction when synchronizing the Kafka transaction with another
  TransactionManager.
* Don't reuse transactional producers if an exception is thrown when committing
  / rollbacking a transaction. Some of the exceptions are fatal and mean
  the producer cannot be reused.
* Close the transactional producer if an exception occurs when calling
  beginTransaction when not using DefaultKafkaProducerFactory.
  • Loading branch information
bgK authored and garyrussell committed Jul 26, 2018
1 parent 63472ce commit ac34b28
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 22 deletions.
Expand Up @@ -324,14 +324,6 @@ public void beginTransaction() throws ProducerFencedException {
}
catch (RuntimeException e) {
this.txFailed = true;
logger.error("Illegal transaction state; producer removed from cache; possible cause: "
+ "broker restarted during transaction", e);
try {
this.delegate.close();
}
catch (Exception ee) {
// empty
}
throw e;
}
}
Expand All @@ -344,20 +336,40 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs

@Override
public void commitTransaction() throws ProducerFencedException {
this.delegate.commitTransaction();
try {
this.delegate.commitTransaction();
}
catch (RuntimeException e) {
this.txFailed = true;
throw e;
}
}

@Override
public void abortTransaction() throws ProducerFencedException {
this.delegate.abortTransaction();
try {
this.delegate.abortTransaction();
}
catch (RuntimeException e) {
this.txFailed = true;
throw e;
}
}

@Override
public void close() {
if (this.cache != null && !this.txFailed) {
synchronized (this) {
if (!this.cache.contains(this)) {
this.cache.offer(this);
if (this.cache != null) {
if (this.txFailed) {
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
+ "broker restarted during transaction");

this.delegate.close();
}
else {
synchronized (this) {
if (!this.cache.contains(this)) {
this.cache.offer(this);
}
}
}
}
Expand Down
Expand Up @@ -259,7 +259,15 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
Producer<K, V> producer = this.producers.get();
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
producer = this.producerFactory.createProducer();
producer.beginTransaction();

try {
producer.beginTransaction();
}
catch (Exception e) {
closeProducer(producer, false);
throw e;
}

this.producers.set(producer);
T result = null;
try {
Expand Down
Expand Up @@ -57,7 +57,15 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
.getResource(producerFactory);
if (resourceHolder == null) {
Producer<K, V> producer = producerFactory.createProducer();
producer.beginTransaction();

try {
producer.beginTransaction();
}
catch (RuntimeException e) {
producer.close();
throw e;
}

resourceHolder = new KafkaResourceHolder<K, V>(producer);
bindResourceToTransaction(resourceHolder, producerFactory);
}
Expand Down Expand Up @@ -128,14 +136,17 @@ protected boolean shouldReleaseBeforeCompletion() {

@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.commit();
try {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.commit();
}
else {
this.resourceHolder.rollback();
}
}
else {
this.resourceHolder.rollback();
finally {
super.afterCompletion(status);
}

super.afterCompletion(status);
}

@Override
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -50,6 +51,7 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.transaction.ResourcelessTransactionManager;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -212,6 +214,58 @@ public void testNoTx() {
.hasMessageContaining("No transaction is in process;");
}

@Test
public void testTransactionSynchronization() {
MockProducer<String, String> producer = new MockProducer<>();
producer.initTransactions();

@SuppressWarnings("unchecked")
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer()).willReturn(producer);

KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);

ResourcelessTransactionManager tm = new ResourcelessTransactionManager();

new TransactionTemplate(tm).execute(s -> {
template.sendDefault("foo", "bar");
return null;
});

assertThat(producer.history()).containsExactly(new ProducerRecord<>(STRING_KEY_TOPIC, "foo", "bar"));
assertThat(producer.transactionCommitted()).isTrue();
assertThat(producer.closed()).isTrue();
}

@Test
public void testTransactionSynchronizationExceptionOnCommit() {
MockProducer<String, String> producer = new MockProducer<>();
producer.initTransactions();

@SuppressWarnings("unchecked")
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer()).willReturn(producer);

KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);

ResourcelessTransactionManager tm = new ResourcelessTransactionManager();

new TransactionTemplate(tm).execute(s -> {
template.sendDefault("foo", "bar");

// Mark the mock producer as fenced so it throws when committing the transaction
producer.fenceProducer();
return null;
});

assertThat(producer.transactionCommitted()).isFalse();
assertThat(producer.closed()).isTrue();
}

@Configuration
@EnableTransactionManagement
public static class DeclarativeConfig {
Expand Down
@@ -0,0 +1,105 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.transaction;

import java.util.ArrayList;
import java.util.List;

import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@SuppressWarnings("serial")
public class ResourcelessTransactionManager extends AbstractPlatformTransactionManager {

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
((ResourcelessTransaction) transaction).begin();
}

@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
if (logger.isDebugEnabled()) {
logger.debug("Committing resourceless transaction on [" + status.getTransaction() + "]");
}
}

@Override
protected Object doGetTransaction() throws TransactionException {
Object transaction = new ResourcelessTransaction();
List<Object> resources;
if (!TransactionSynchronizationManager.hasResource(this)) {
resources = new ArrayList<>();
TransactionSynchronizationManager.bindResource(this, resources);
}
else {
@SuppressWarnings("unchecked")
List<Object> stack = (List<Object>) TransactionSynchronizationManager.getResource(this);
resources = stack;
}
resources.add(transaction);
return transaction;
}

@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
if (logger.isDebugEnabled()) {
logger.debug("Rolling back resourceless transaction on [" + status.getTransaction() + "]");
}
}

@Override
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.hasResource(this)) {
List<?> stack = (List<?>) TransactionSynchronizationManager.getResource(this);
return stack.size() > 1;
}
return ((ResourcelessTransaction) transaction).isActive();
}

@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
}

@Override
protected void doCleanupAfterCompletion(Object transaction) {
List<?> resources = (List<?>) TransactionSynchronizationManager.getResource(this);
resources.clear();
TransactionSynchronizationManager.unbindResource(this);
((ResourcelessTransaction) transaction).clear();
}

private static class ResourcelessTransaction {

private boolean active = false;

public boolean isActive() {
return active;
}

public void begin() {
active = true;
}

public void clear() {
active = false;
}

}
}

0 comments on commit ac34b28

Please sign in to comment.