Skip to content

Commit

Permalink
GH-646: Transaction Improvements
Browse files Browse the repository at this point in the history
Resolves #646

- add protection to the transactional producer cache to avoid multiple cache inserts
- add try/catch around `abortTransaction()`
- detect (and reject) an illegal call to `template.send()` when there is no existing transaction
- from the send callback remove the `ThreadLocal` check (will always be null) and only close the producer if it's not transactional
- remove autoFlush from template tests
  • Loading branch information
garyrussell authored and artembilan committed Apr 13, 2018
1 parent e6985e8 commit 1b53c4b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 15 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -302,8 +302,8 @@ public void abortTransaction() throws ProducerFencedException {
}

@Override
public void close() {
if (this.cache != null) {
public synchronized void close() {
if (this.cache != null && !this.cache.contains(this)) {
this.cache.offer(this);
}
}
Expand Down
Expand Up @@ -265,10 +265,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
result = callback.doInOperations(this);
}
catch (Exception e) {
producer.abortTransaction();
this.producers.remove();
closeProducer(producer, false);
producer = null;
try {
producer.abortTransaction();
}
finally {
this.producers.remove();
closeProducer(producer, false);
producer = null;
}
}
if (producer != null) {
try {
Expand Down Expand Up @@ -328,12 +332,19 @@ protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
* @return a Future for the {@link RecordMetadata}.
*/
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(inTransaction(),
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
+ "before invoking the template method, "
+ "run in a transaction started by a listener container when consuming a record");
}
final Producer<K, V> producer = getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
final boolean inLocalTx = inTransaction();
producer.send(producerRecord, new Callback() {

@Override
Expand Down Expand Up @@ -365,8 +376,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}
}
finally {
if (KafkaTemplate.this.producers.get() == null) {
closeProducer(producer, inLocalTx);
if (!KafkaTemplate.this.transactional) {
closeProducer(producer, false);
}
}
}
Expand All @@ -381,8 +392,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
return future;
}


protected boolean inTransaction() {
return this.producers.get() != null || TransactionSynchronizationManager.isActualTransactionActive();
return this.transactional && (this.producers.get() != null
|| TransactionSynchronizationManager.getResource(this.producerFactory) != null
|| TransactionSynchronizationManager.isActualTransactionActive());
}

private Producer<K, V> getTheProducer() {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,7 @@ public class KafkaAdminBadContextTests {
@Test
public void testContextNotLoaded() {
try {
new AnnotationConfigApplicationContext(BadConfig.class);
new AnnotationConfigApplicationContext(BadConfig.class).close();
fail("Expected Exception");
}
catch (IllegalStateException e) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testLocalTransaction() throws Exception {
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setKeySerializer(new StringSerializer());
pf.setTransactionIdPrefix("my.transaction.");
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down Expand Up @@ -119,7 +120,7 @@ public void testGlobalTransaction() throws Exception {
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setKeySerializer(new StringSerializer());
pf.setTransactionIdPrefix("my.transaction.");
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down Expand Up @@ -194,6 +195,20 @@ public void testOverrideProducerIdempotentConfig() throws Exception {
.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false);
}

@Test
public void testNoTx() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setKeySerializer(new StringSerializer());
pf.setTransactionIdPrefix("my.transaction.");
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
assertThatThrownBy(() -> template.send("foo", "bar"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No transaction is in process;");
}

@Configuration
@EnableTransactionManagement
public static class DeclarativeConfig {
Expand Down

0 comments on commit 1b53c4b

Please sign in to comment.