Skip to content

Commit

Permalink
GH-1441: Close producer after fatal send error
Browse files Browse the repository at this point in the history
Resolves #1441

Close and remove a producer after a `OutOfOrderSequenceException` or subclass.

**I expect conflicts so I will do the backports**
  • Loading branch information
garyrussell committed Apr 10, 2020
1 parent 5441fb2 commit ced67f1
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -123,6 +124,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();

private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();

private final AtomicInteger clientIdCounter = new AtomicInteger();

private Supplier<Serializer<K>> keySerializerSupplier;
Expand All @@ -139,8 +142,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private boolean producerPerThread;

private ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers;

private String clientIdPrefix;

private volatile CloseSafeProducer<K, V> producer;
Expand Down Expand Up @@ -252,7 +253,6 @@ protected String getTransactionIdPrefix() {
*/
public void setProducerPerThread(boolean producerPerThread) {
this.producerPerThread = producerPerThread;
this.threadBoundProducers = new ThreadLocal<>();
}

/**
Expand Down Expand Up @@ -305,8 +305,11 @@ public boolean transactionCapable() {
@SuppressWarnings("resource")
@Override
public void destroy() {
CloseSafeProducer<K, V> producerToClose = this.producer;
this.producer = null;
CloseSafeProducer<K, V> producerToClose;
synchronized (this) {
producerToClose = this.producer;
this.producer = null;
}
if (producerToClose != null) {
producerToClose.getDelegate().close(this.physicalCloseTimeout);
}
Expand Down Expand Up @@ -380,19 +383,19 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
if (this.producerPerThread) {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
if (tlProducer == null) {
tlProducer = new CloseSafeProducer<>(createKafkaProducer());
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout);
this.threadBoundProducers.set(tlProducer);
}
return tlProducer;
}
if (this.producer == null) {
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<>(createKafkaProducer());
}
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout);
}
return this.producer;
}
return this.producer;
}

/**
Expand Down Expand Up @@ -447,6 +450,20 @@ private void removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove) {
}
}

/**
* Remove the single shared producer and a thread-bound instance if present.
* @param producerToRemove the producer;
* @since 2.2.13
*/
protected final synchronized void removeProducer(
@SuppressWarnings("unused") CloseSafeProducer<K, V> producerToRemove) {

if (producerToRemove.equals(this.producer)) {
this.producer = null;
}
this.threadBoundProducers.remove();
}

/**
* Subclasses must return a producer from the {@link #getCache()} or a
* new raw producer wrapped in a {@link CloseSafeProducer}.
Expand Down Expand Up @@ -482,7 +499,7 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newProducer = createRawProducer(newProducerConfigs);
newProducer.initTransactions();
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout);
}

protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
Expand Down Expand Up @@ -545,34 +562,43 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

private final BlockingQueue<CloseSafeProducer<K, V>> cache;

private final Consumer<CloseSafeProducer<K, V>> removeConsumerProducer;
private final Consumer<CloseSafeProducer<K, V>> removeProducer;

private final String txId;

private volatile Exception txFailed;
private final Duration closeTimeout;

private volatile Exception producerFailed;

private volatile boolean closed;

CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer,
Duration closeTimeout) {

CloseSafeProducer(Producer<K, V> delegate) {
this(delegate, null, null);
this(delegate, null, removeProducer, null, closeTimeout);
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
this(delegate, cache, null);
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
Duration closeTimeout) {
this(delegate, cache, null, closeTimeout);
}

CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer) {
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, Duration closeTimeout) {

this(delegate, cache, removeConsumerProducer, null);
this(delegate, cache, removeConsumerProducer, null, closeTimeout);
}

CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, @Nullable String txId) {
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
Duration closeTimeout) {

this.delegate = delegate;
this.cache = cache;
this.removeConsumerProducer = removeConsumerProducer;
this.removeProducer = removeProducer;
this.txId = txId;
this.closeTimeout = closeTimeout;
LOGGER.debug(() -> "Created new Producer: " + this);
}

Expand All @@ -589,7 +615,18 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
LOGGER.trace(() -> toString() + " send(" + record + ")");
return this.delegate.send(record, callback);
return this.delegate.send(record, new Callback() {

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}
callback.onCompletion(metadata, exception);
}

});
}

@Override
Expand Down Expand Up @@ -621,7 +658,7 @@ public void beginTransaction() throws ProducerFencedException {
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "beginTransaction failed: " + this);
this.txFailed = e;
this.producerFailed = e;
throw e;
}
}
Expand All @@ -642,16 +679,16 @@ public void commitTransaction() throws ProducerFencedException {
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "commitTransaction failed: " + this);
this.txFailed = e;
this.producerFailed = e;
throw e;
}
}

@Override
public void abortTransaction() throws ProducerFencedException {
LOGGER.debug(() -> toString() + " abortTransaction()");
if (this.txFailed != null) {
LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage()
if (this.producerFailed != null) {
LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage()
+ ": " + this);
}
else {
Expand All @@ -660,7 +697,7 @@ public void abortTransaction() throws ProducerFencedException {
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "Abort failed: " + this);
this.txFailed = e;
this.producerFailed = e;
throw e;
}
}
Expand All @@ -681,25 +718,26 @@ public void close(long timeout, @Nullable TimeUnit unit) {
@Override
public void close(@Nullable Duration timeout) {
LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")");
if (this.cache != null) {
Duration closeTimeout = this.txFailed instanceof TimeoutException
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: timeout;
if (this.txFailed != null) {
LOGGER.warn(() -> "Error during transactional operation; producer removed from cache; "
+ "possible cause: "
+ "broker restarted during transaction: " + this);
this.delegate.close(closeTimeout);
if (this.removeConsumerProducer != null) {
this.removeConsumerProducer.accept(this);
if (!this.closed) {
if (this.producerFailed != null) {
LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);
this.closed = true;
this.delegate.close(this.producerFailed instanceof TimeoutException
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: timeout);
if (this.removeProducer != null) {
this.removeProducer.accept(this);
}
}
else {
if (this.removeConsumerProducer == null) { // dedicated consumer producers are not cached
if (this.cache != null && this.removeProducer == null) { // dedicated consumer producers are not cached
synchronized (this) {
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
this.delegate.close(closeTimeout);
this.closed = true;
this.delegate.close(this.producerFailed instanceof TimeoutException
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: timeout);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willThrow;
Expand All @@ -31,11 +32,14 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

Expand Down Expand Up @@ -67,7 +71,8 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
producer.initTransactions();
BlockingQueue<Producer> cache = getCache(txIdPrefix);
Producer cached = cache.poll();
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
return cached == null ? new CloseSafeProducer(producer, cache,
Duration.ofSeconds(1)) : cached;
}

};
Expand Down Expand Up @@ -148,7 +153,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
producer.initTransactions();
BlockingQueue<Producer> cache = getCache(txIdPrefix);
Producer cached = cache.poll();
return cached == null ? new CloseSafeProducer(producer, cache) : cached;
return cached == null ? new CloseSafeProducer(producer, cache, Duration.ofSeconds(1)) : cached;
}

};
Expand Down Expand Up @@ -221,4 +226,33 @@ protected Producer createRawProducer(Map configs) {
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(0);
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testUnknownProducerIdException() {
final Producer producer1 = mock(Producer.class);
willAnswer(inv -> {
((Callback) inv.getArgument(1)).onCompletion(null, new UnknownProducerIdException("test"));
return null;
}).given(producer1).send(isNull(), any());
final Producer producer2 = mock(Producer.class);
ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

private final AtomicBoolean first = new AtomicBoolean(true);

@Override
protected Producer createKafkaProducer() {
return this.first.getAndSet(false) ? producer1 : producer2;
}

};
final Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.send(null, (meta, ex) -> { });
aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
verify(producer1).close(any(Duration.class));
Producer bProducer = pf.createProducer();
assertThat(bProducer).isNotSameAs(aProducer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ public void testQuickCloseAfterCommitTimeout() {

@Override
public Producer<String, String> createProducer(String txIdPrefixArg) {
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer, getCache());
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer, getCache(),
Duration.ofSeconds(1));
return closeSafeProducer;
}

Expand All @@ -400,19 +401,22 @@ public void testNormalCloseAfterCommitCacheFull() {
@SuppressWarnings("unchecked")
Producer<String, String> producer = mock(Producer.class);

DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<String, String>(Collections.emptyMap()) {
DefaultKafkaProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<String, String>(Collections.emptyMap()) {

@SuppressWarnings("unchecked")
@Override
public Producer<String, String> createProducer(String txIdPrefixArg) {
BlockingQueue<CloseSafeProducer<String, String>> cache = new LinkedBlockingDeque<>(1);
try {
cache.put(new CloseSafeProducer<>(mock(Producer.class)));
cache.put(new CloseSafeProducer<>(mock(Producer.class), this::removeProducer,
Duration.ofSeconds(1)));
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer, cache);
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer, cache,
Duration.ofSeconds(1));
return closeSafeProducer;
}

Expand Down

0 comments on commit ced67f1

Please sign in to comment.