Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
KMessageSource getter for consumer properties
Browse files Browse the repository at this point in the history
- allow customization after creation - e.g. SCSt `MessageSourceCustomizer`
- also fix deprecation in `ReplyingKafkaTemplate`
  • Loading branch information
garyrussell committed Sep 25, 2019
1 parent cb80467 commit 2f473bd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.kafka.dsl;

import java.time.Duration;

This comment has been minimized.

Copy link
@garyrussell

garyrussell Sep 25, 2019

Author Contributor

@artembilan I just pushed this to master by mistake - please review.

This comment has been minimized.

Copy link
@artembilan

artembilan Sep 25, 2019

Contributor

👍

Nothing to complain about!

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -119,14 +120,32 @@ public static class ReplyingKafkaTemplateSpec<K, V, R> extends KafkaTemplateSpec
}

@SuppressWarnings("unchecked")
ReplyingKafkaTemplateSpec<K, V, R> taskScheduler(TaskScheduler scheduler) {
public ReplyingKafkaTemplateSpec<K, V, R> taskScheduler(TaskScheduler scheduler) {
((ReplyingKafkaTemplate<K, V, R>) this.target).setTaskScheduler(scheduler);
return this;
}

/**
* Default reply timeout.
* @param replyTimeout the timeout.
* @return the spec.
* @deprecated in favor of {@link #defaultReplyTimeout(Duration)}.
*/
@Deprecated
@SuppressWarnings("unchecked")
public ReplyingKafkaTemplateSpec<K, V, R> replyTimeout(long replyTimeout) {
((ReplyingKafkaTemplate<K, V, R>) this.target).setDefaultReplyTimeout(Duration.ofMillis(replyTimeout));
return this;
}

/**
* Default reply timeout.
* @param replyTimeout the timeout.
* @return the spec.
*/
@SuppressWarnings("unchecked")
ReplyingKafkaTemplateSpec<K, V, R> replyTimeout(long replyTimeout) {
((ReplyingKafkaTemplate<K, V, R>) this.target).setReplyTimeout(replyTimeout);
public ReplyingKafkaTemplateSpec<K, V, R> defaultReplyTimeout(Duration replyTimeout) {
((ReplyingKafkaTemplate<K, V, R>) this.target).setDefaultReplyTimeout(replyTimeout);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ protected void onInit() {
}
}

/**
* Get a reference to the configured consumer properties; allows further
* customization of the properties before the source is started.
* @return the properties.
* @since 3.2
*/
public ConsumerProperties getConsumerProperties() {
return this.consumerProperties;
}

protected String getGroupId() {
return this.consumerProperties.getGroupId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -354,7 +355,7 @@ public IntegrationFlow outboundGateFlow() {
return IntegrationFlows.from(Gate.class)
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.sync(true)
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.configureKafkaTemplate(t -> t.defaultReplyTimeout(Duration.ofSeconds(30))))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import org.springframework.messaging.support.GenericMessage
import org.springframework.retry.support.RetryTemplate
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit4.SpringRunner
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.stream.Stream
Expand Down Expand Up @@ -328,7 +329,7 @@ class KafkaDslKotlinTests {
fun replyingKafkaTemplate() =
ReplyingKafkaTemplate(producerFactory(), replyContainer())
.also {
it.setReplyTimeout(30000)
it.setDefaultReplyTimeout(Duration.ofSeconds(30))
}

@Bean
Expand Down

0 comments on commit 2f473bd

Please sign in to comment.