Skip to content

Commit

Permalink
Further Improve Kafka Test Run Time
Browse files Browse the repository at this point in the history
Configurable `timeoutBuffer`.
  • Loading branch information
garyrussell authored and artembilan committed Oct 16, 2020
1 parent a136b5b commit 75d14d3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
/**
* Buffer added to ensure our timeout is longer than Kafka's.
*/
private static final int TIMEOUT_BUFFER = 5000;
private static final int DEFAULT_TIMEOUT_BUFFER = 5000;

private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<>();

Expand Down Expand Up @@ -162,6 +162,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
(message, topic, partition, timestamp, key, value, headers) ->
new ProducerRecord<>(topic, partition, timestamp, key, value, headers);

private int timeoutBuffer = DEFAULT_TIMEOUT_BUFFER;

private volatile byte[] singleReplyTopic;

public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
Expand All @@ -188,7 +190,7 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
determineSendTimeout();
this.deliveryTimeoutMsProperty =
this.sendTimeoutExpression.getValue(Long.class) // NOSONAR - never null after determineSendTimeout()
- TIMEOUT_BUFFER;
- this.timeoutBuffer;
}

private void determineSendTimeout() {
Expand All @@ -198,13 +200,13 @@ private void determineSendTimeout() {
dt = ProducerConfig.configDef().defaultValues().get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
}
if (dt instanceof Long) {
setSendTimeout(((Long) dt) + TIMEOUT_BUFFER);
setSendTimeout(((Long) dt) + this.timeoutBuffer);
}
else if (dt instanceof Integer) {
setSendTimeout(Long.valueOf((Integer) dt) + TIMEOUT_BUFFER);
setSendTimeout(Long.valueOf((Integer) dt) + this.timeoutBuffer);
}
else if (dt instanceof String) {
setSendTimeout(Long.parseLong((String) dt) + TIMEOUT_BUFFER);
setSendTimeout(Long.parseLong((String) dt) + this.timeoutBuffer);
}
}

Expand Down Expand Up @@ -290,10 +292,12 @@ public final void setSendTimeout(long sendTimeout) {
* Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
* {@link KafkaProducerMessageHandler} should wait wait for send operation results.
* Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout
* is applied only in {@link #sync} mode.
* is applied only in {@link #sync} mode. If this expression yields a result that is
* less than that value, the higher value is used.
* @param sendTimeoutExpression the {@link Expression} for timeout to wait for result
* for a send operation.
* @since 2.1.1
* @see #setTimeoutBuffer(int)
*/
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
Expand Down Expand Up @@ -404,6 +408,18 @@ public void setProducerRecordCreator(ProducerRecordCreator<K, V> producerRecordC
this.producerRecordCreator = producerRecordCreator;
}

/**
* Set a buffer, in milliseconds, added to the configured {@code delivery.timeout.ms}
* to determine the minimum time to wait for the send future completion when
* {@link #setSync(boolean) sync} is true.
* @param timeoutBuffer the buffer.
* @since 5.4
* @see #setSendTimeoutExpression(Expression)
*/
public void setTimeoutBuffer(int timeoutBuffer) {
this.timeoutBuffer = timeoutBuffer;
}

@Override
public String getComponentType() {
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
Expand Down Expand Up @@ -680,10 +696,10 @@ public void onFailure(Throwable ex) {
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
if (sendTimeout != null && sendTimeout <= this.deliveryTimeoutMsProperty) {
this.logger.debug("'sendTimeout' increased to "
+ (this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER)
+ (this.deliveryTimeoutMsProperty + this.timeoutBuffer)
+ "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer "
+ "property to avoid false failures");
sendTimeout = this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER;
sendTimeout = this.deliveryTimeoutMsProperty + this.timeoutBuffer;
}
if (sendTimeout == null || sendTimeout < 0) {
future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void close(Duration timeout) {

handler.setSync(true);
handler.setSendTimeout(10);
handler.setTimeoutBuffer(200);
handler.setTopicExpression(new LiteralExpression("foo"));

Executors.newSingleThreadExecutor()
Expand Down

0 comments on commit 75d14d3

Please sign in to comment.