Skip to content

Commit

Permalink
Improve Delayer DSL (#8645)
Browse files Browse the repository at this point in the history
* Improve Delayer DSL

Move `groupId` option from a `delay()` method arg to the `DelayerEndpointSpec`
to make it cleaner from code reading perspective
* Expose new DSL method based on just a `DelayerEndpointSpec` for Kotlin &v Groovy
* Deprecate multi-arg `delay()` methods in favor of `Consumer<DelayerEndpointSpec>`-based

* * Fix language and code style
  • Loading branch information
artembilan committed Jun 12, 2023
1 parent c4ee551 commit 81af20a
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 21 deletions.
Expand Up @@ -1178,12 +1178,18 @@ public B bridge(@Nullable Consumer<GenericEndpointSpec<BridgeHandler>> endpointC
/**
* Populate a {@link DelayHandler} to the current integration flow position
* with default options.
* Shortcut for:
* <pre class="code">
* {@code
* .delay(delayer -> delayer.messageGroupId(groupId))
* }
* </pre>
* @param groupId the {@code groupId} for delayed messages in the
* {@link org.springframework.integration.store.MessageGroupStore}.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B delay(String groupId) {
return delay(groupId, null);
return delay(delayer -> delayer.messageGroupId(groupId));
}

/**
Expand All @@ -1192,12 +1198,25 @@ public B delay(String groupId) {
* {@link org.springframework.integration.store.MessageGroupStore}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @deprecated since 6.2 in favor of {@link #delay(Consumer)}
* @see DelayerEndpointSpec
*/
@Deprecated(since = "6.2", forRemoval = true)
public B delay(String groupId, @Nullable Consumer<DelayerEndpointSpec> endpointConfigurer) {
return register(new DelayerEndpointSpec(new DelayHandler(groupId)), endpointConfigurer);
}

/**
* Populate a {@link DelayHandler} to the current integration flow position.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.2
* @see DelayerEndpointSpec
*/
public B delay(Consumer<DelayerEndpointSpec> endpointConfigurer) {
return register(new DelayerEndpointSpec(), endpointConfigurer);
}

/**
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
* to the current integration flow position
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,10 @@ public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpe

private final List<Advice> delayedAdvice = new LinkedList<>();

protected DelayerEndpointSpec() {
this(new DelayHandler());
}

protected DelayerEndpointSpec(DelayHandler delayHandler) {
super(delayHandler);
Assert.notNull(delayHandler, "'delayHandler' must not be null.");
Expand Down Expand Up @@ -225,4 +229,16 @@ public <P> DelayerEndpointSpec delayFunction(Function<Message<P>, Object> delayF
return this;
}

/**
* Set a group id to manage delayed messages by this handler.
* @param messageGroupId the group id for delayed messages.
* @return the endpoint spec.
* @since 6.2
* @see DelayHandler#setMessageGroupId(String)
*/
public DelayerEndpointSpec messageGroupId(String messageGroupId) {
this.handler.setMessageGroupId(messageGroupId);
return this;
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.aopalliance.aop.Advice;
Expand Down Expand Up @@ -98,10 +100,12 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement

public static final long DEFAULT_RETRY_DELAY = 1_000;

private final String messageGroupId;

private final ConcurrentMap<String, AtomicInteger> deliveries = new ConcurrentHashMap<>();

private final Lock removeReleasedMessageLock = new ReentrantLock();

private String messageGroupId;

private long defaultDelay;

private Expression delayExpression;
Expand All @@ -126,6 +130,14 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement

private long retryDelay = DEFAULT_RETRY_DELAY;

/**
* Construct an instance with default options.
* The {@link #messageGroupId}must then be provided via the setter.
* @since 6.2
*/
public DelayHandler() {
}

/**
* Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for
* {@link MessageGroup} to store delayed Messages in the {@link MessageGroupStore}.
Expand All @@ -151,6 +163,15 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) {
setTaskScheduler(taskScheduler);
}

/**
* Set a group id to manage delayed messages by this handler.
* @param messageGroupId the group id for delayed messages.
* @since 6.2
*/
public void setMessageGroupId(String messageGroupId) {
this.messageGroupId = messageGroupId;
}

/**
* Set the default delay in milliseconds. If no {@code delayExpression} property has
* been provided, the default delay will be applied to all Messages. If a delay should
Expand Down Expand Up @@ -187,10 +208,10 @@ public void setDelayExpressionString(String delayExpression) {

/**
* Specify whether {@code Exceptions} thrown by {@link #delayExpression} evaluation
* should be ignored (only logged). In this case case the delayer will fall back to
* the to the {@link #defaultDelay}. If this property is specified as {@code false},
* should be ignored (only logged). In this case the delayer will fall back to
* the {@link #defaultDelay}. If this property is specified as {@code false},
* any {@link #delayExpression} evaluation {@code Exception} will be thrown to the
* caller without falling back to the to the {@link #defaultDelay}. Default is
* caller without falling back to the {@link #defaultDelay}. Default is
* {@code true}.
* @param ignoreExpressionFailures true if expression evaluation failures should be
* ignored.
Expand Down Expand Up @@ -297,6 +318,8 @@ public IntegrationPatternType getIntegrationPatternType() {

@Override
protected void doInit() {
Assert.notNull(this.messageGroupId, "A 'messageGroupId' must be provided");

if (this.messageStore == null) {
this.messageStore = new SimpleMessageStore();
}
Expand Down Expand Up @@ -552,7 +575,8 @@ private void doReleaseMessage(Message<?> message) {

private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore) {
synchronized (this.messageGroupId) {
this.removeReleasedMessageLock.lock();
try {
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages();
if (messages.contains(message)) {
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
Expand All @@ -562,6 +586,9 @@ private boolean removeDelayedMessageFromMessageStore(Message<?> message) {
return false;
}
}
finally {
this.removeReleasedMessageLock.unlock();
}
}
else {
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null;
Expand Down
Expand Up @@ -561,10 +561,24 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
/**
* Populate a [DelayHandler] to the current integration flow position.
*/
@Deprecated("since 6.2",
ReplaceWith("""
delay {
messageGroupId(groupId)
}"""))
@Suppress("DEPRECATION")
fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) {
this.delegate.delay(groupId, endpointConfigurer)
}

/**
* Populate a [DelayHandler] to the current integration flow position.
* @since 6.2
*/
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) {
this.delegate.delay(endpointConfigurer)
}

/**
* Populate a [org.springframework.integration.transformer.ContentEnricher]
* to the current integration flow position
Expand Down Expand Up @@ -713,10 +727,11 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
* Provide the [HeaderFilter] to the current [IntegrationFlow].
*/
@Deprecated("since 6.2",
ReplaceWith("""headerFilter {
patternMatch()
headersToRemove()
}"""))
ReplaceWith("""
headerFilter {
patternMatch()
headersToRemove()
}"""))
fun headerFilter(headersToRemove: String, patternMatch: Boolean = true) {
headerFilter {
patternMatch(patternMatch)
Expand Down
Expand Up @@ -801,7 +801,8 @@ public IntegrationFlow bridgeFlow2() {
return IntegrationFlow.from("bridgeFlow2Input")
.bridge(c -> c.autoStartup(false).id("bridge"))
.fixedSubscriberChannel()
.delay("delayer", d -> d
.delay(d -> d
.messageGroupId("delayer")
.delayExpression("200")
.advice(this.delayedAdvice)
.messageStore(this.messageStore()))
Expand Down
Expand Up @@ -320,7 +320,10 @@ class KotlinDslTests {
wireTap {
channel { queue("wireTapChannel") }
}
delay("delayGroup") { defaultDelay(100) }
delay {
messageGroupId("delayGroup")
defaultDelay(100)
}
transform<String> { it.uppercase() }
}

Expand Down
Expand Up @@ -184,7 +184,8 @@ public IntegrationFlow eventProducerFlow() {
@Bean
public IntegrationFlow delayFlow() {
return flow -> flow
.delay(GROUP_ID, e -> e
.delay(e -> e
.messageGroupId(GROUP_ID)
.messageStore(messageGroupStore)
.id("delayer"))
.channel(MessageChannels.queue("delayedResults"));
Expand Down
Expand Up @@ -618,7 +618,9 @@ class GroovyIntegrationFlowDefinition {
* {@link org.springframework.integration.store.MessageGroupStore}.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @see org.springframework.integration.dsl.DelayerEndpointSpec
* @deprecated since 6.2 in favor of {@link #delay(groovy.lang.Closure)}
*/
@Deprecated(since = "6.2", forRemoval = true)
GroovyIntegrationFlowDefinition delay(
String groupId,
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
Expand All @@ -629,6 +631,21 @@ class GroovyIntegrationFlowDefinition {
this
}

/**
* Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @see org.springframework.integration.dsl.DelayerEndpointSpec
* @since 6.2
*/
GroovyIntegrationFlowDefinition delay(
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.DelayerEndpointSpec')
Closure<?> endpointConfigurer) {

this.delegate.delay createConfigurerIfAny(endpointConfigurer)
this
}

/**
* Populate a {@link org.springframework.integration.transformer.ContentEnricher}
* to the current integration flow position
Expand Down Expand Up @@ -657,7 +674,7 @@ class GroovyIntegrationFlowDefinition {
GroovyIntegrationFlowDefinition enrichHeaders(
@DelegatesTo(value = HeaderEnricherSpec, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.HeaderEnricherSpec')
Closure<?> enricherConfigurer = null) {
Closure<?> enricherConfigurer) {

this.delegate.enrichHeaders createConfigurerIfAny(enricherConfigurer)
this
Expand Down
Expand Up @@ -314,7 +314,10 @@ class GroovyDslTests {
wireTap integrationFlow {
channel { queue 'wireTapChannel' }
}
delay 'delayGroup', { defaultDelay 100 }
delay {
messageGroupId 'delayGroup'
defaultDelay 100
}
transform String, { it.toUpperCase() }
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/reference/asciidoc/delayer.adoc
Expand Up @@ -33,7 +33,8 @@ If you need to determine the delay for each message, you can also provide the Sp
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
Expand All @@ -46,7 +47,8 @@ public IntegrationFlow flow() {
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/groovy-dsl.adoc
Expand Up @@ -35,7 +35,10 @@ flowLambda() {
wireTap integrationFlow {
channel { queue 'wireTapChannel' }
}
delay 'delayGroup', { defaultDelay 100 }
delay {
messageGroupId 'delayGroup'
defaultDelay 100
}
transform String, { it.toUpperCase() }
}
}
Expand Down

0 comments on commit 81af20a

Please sign in to comment.