From 81af20aabfeb35c1485175d25b79ef72376b13c7 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 12 Jun 2023 16:01:40 -0400 Subject: [PATCH] Improve Delayer DSL (#8645) * Improve Delayer DSL Move `groupId` option from a `delay()` method arg to the `DelayerEndpointSpec` to make it cleaner from code reading perspective * Expose new DSL method based on just a `DelayerEndpointSpec` for Kotlin &v Groovy * Deprecate multi-arg `delay()` methods in favor of `Consumer`-based * * Fix language and code style --- .../dsl/BaseIntegrationFlowDefinition.java | 21 +++++++++- .../integration/dsl/DelayerEndpointSpec.java | 18 +++++++- .../integration/handler/DelayHandler.java | 41 +++++++++++++++---- .../dsl/KotlinIntegrationFlowDefinition.kt | 23 +++++++++-- .../dsl/flows/IntegrationFlowTests.java | 3 +- .../integration/dsl/KotlinDslTests.kt | 5 ++- .../event/dsl/IntegrationFlowEventsTests.java | 3 +- .../GroovyIntegrationFlowDefinition.groovy | 19 ++++++++- .../groovy/dsl/test/GroovyDslTests.groovy | 5 ++- src/reference/asciidoc/delayer.adoc | 6 ++- src/reference/asciidoc/groovy-dsl.adoc | 5 ++- 11 files changed, 128 insertions(+), 21 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index d2caa74932e..084420c74ee 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -1178,12 +1178,18 @@ public B bridge(@Nullable Consumer> endpointC /** * Populate a {@link DelayHandler} to the current integration flow position * with default options. + * Shortcut for: + *
+	 * {@code
+	 *  .delay(delayer -> delayer.messageGroupId(groupId))
+	 * }
+	 * 
* @param groupId the {@code groupId} for delayed messages in the * {@link org.springframework.integration.store.MessageGroupStore}. * @return the current {@link BaseIntegrationFlowDefinition}. */ public B delay(String groupId) { - return delay(groupId, null); + return delay(delayer -> delayer.messageGroupId(groupId)); } /** @@ -1192,12 +1198,25 @@ public B delay(String groupId) { * {@link org.springframework.integration.store.MessageGroupStore}. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @return the current {@link BaseIntegrationFlowDefinition}. + * @deprecated since 6.2 in favor of {@link #delay(Consumer)} * @see DelayerEndpointSpec */ + @Deprecated(since = "6.2", forRemoval = true) public B delay(String groupId, @Nullable Consumer endpointConfigurer) { return register(new DelayerEndpointSpec(new DelayHandler(groupId)), endpointConfigurer); } + /** + * Populate a {@link DelayHandler} to the current integration flow position. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @return the current {@link BaseIntegrationFlowDefinition}. + * @since 6.2 + * @see DelayerEndpointSpec + */ + public B delay(Consumer endpointConfigurer) { + return register(new DelayerEndpointSpec(), endpointConfigurer); + } + /** * Populate a {@link org.springframework.integration.transformer.ContentEnricher} * to the current integration flow position diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java index fbe1e92307c..8fea979ac81 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,10 @@ public class DelayerEndpointSpec extends ConsumerEndpointSpec delayedAdvice = new LinkedList<>(); + protected DelayerEndpointSpec() { + this(new DelayHandler()); + } + protected DelayerEndpointSpec(DelayHandler delayHandler) { super(delayHandler); Assert.notNull(delayHandler, "'delayHandler' must not be null."); @@ -225,4 +229,16 @@ public

DelayerEndpointSpec delayFunction(Function, Object> delayF return this; } + /** + * Set a group id to manage delayed messages by this handler. + * @param messageGroupId the group id for delayed messages. + * @return the endpoint spec. + * @since 6.2 + * @see DelayHandler#setMessageGroupId(String) + */ + public DelayerEndpointSpec messageGroupId(String messageGroupId) { + this.handler.setMessageGroupId(messageGroupId); + return this; + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java index 8d515f270f9..a1553615dad 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.aopalliance.aop.Advice; @@ -98,10 +100,12 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement public static final long DEFAULT_RETRY_DELAY = 1_000; - private final String messageGroupId; - private final ConcurrentMap deliveries = new ConcurrentHashMap<>(); + private final Lock removeReleasedMessageLock = new ReentrantLock(); + + private String messageGroupId; + private long defaultDelay; private Expression delayExpression; @@ -126,6 +130,14 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement private long retryDelay = DEFAULT_RETRY_DELAY; + /** + * Construct an instance with default options. + * The {@link #messageGroupId}must then be provided via the setter. + * @since 6.2 + */ + public DelayHandler() { + } + /** * Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for * {@link MessageGroup} to store delayed Messages in the {@link MessageGroupStore}. @@ -151,6 +163,15 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) { setTaskScheduler(taskScheduler); } + /** + * Set a group id to manage delayed messages by this handler. + * @param messageGroupId the group id for delayed messages. + * @since 6.2 + */ + public void setMessageGroupId(String messageGroupId) { + this.messageGroupId = messageGroupId; + } + /** * Set the default delay in milliseconds. If no {@code delayExpression} property has * been provided, the default delay will be applied to all Messages. If a delay should @@ -187,10 +208,10 @@ public void setDelayExpressionString(String delayExpression) { /** * Specify whether {@code Exceptions} thrown by {@link #delayExpression} evaluation - * should be ignored (only logged). In this case case the delayer will fall back to - * the to the {@link #defaultDelay}. If this property is specified as {@code false}, + * should be ignored (only logged). In this case the delayer will fall back to + * the {@link #defaultDelay}. If this property is specified as {@code false}, * any {@link #delayExpression} evaluation {@code Exception} will be thrown to the - * caller without falling back to the to the {@link #defaultDelay}. Default is + * caller without falling back to the {@link #defaultDelay}. Default is * {@code true}. * @param ignoreExpressionFailures true if expression evaluation failures should be * ignored. @@ -297,6 +318,8 @@ public IntegrationPatternType getIntegrationPatternType() { @Override protected void doInit() { + Assert.notNull(this.messageGroupId, "A 'messageGroupId' must be provided"); + if (this.messageStore == null) { this.messageStore = new SimpleMessageStore(); } @@ -552,7 +575,8 @@ private void doReleaseMessage(Message message) { private boolean removeDelayedMessageFromMessageStore(Message message) { if (this.messageStore instanceof SimpleMessageStore) { - synchronized (this.messageGroupId) { + this.removeReleasedMessageLock.lock(); + try { Collection> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages(); if (messages.contains(message)) { this.messageStore.removeMessagesFromGroup(this.messageGroupId, message); @@ -562,6 +586,9 @@ private boolean removeDelayedMessageFromMessageStore(Message message) { return false; } } + finally { + this.removeReleasedMessageLock.unlock(); + } } else { return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null; diff --git a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt index 5d77d4280b6..a6d74051d56 100644 --- a/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt +++ b/spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt @@ -561,10 +561,24 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ /** * Populate a [DelayHandler] to the current integration flow position. */ + @Deprecated("since 6.2", + ReplaceWith(""" + delay { + messageGroupId(groupId) + }""")) + @Suppress("DEPRECATION") fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) { this.delegate.delay(groupId, endpointConfigurer) } + /** + * Populate a [DelayHandler] to the current integration flow position. + * @since 6.2 + */ + fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) { + this.delegate.delay(endpointConfigurer) + } + /** * Populate a [org.springframework.integration.transformer.ContentEnricher] * to the current integration flow position @@ -713,10 +727,11 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ * Provide the [HeaderFilter] to the current [IntegrationFlow]. */ @Deprecated("since 6.2", - ReplaceWith("""headerFilter { - patternMatch() - headersToRemove() - }""")) + ReplaceWith(""" + headerFilter { + patternMatch() + headersToRemove() + }""")) fun headerFilter(headersToRemove: String, patternMatch: Boolean = true) { headerFilter { patternMatch(patternMatch) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java index 7260d897066..79bb09afe96 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java @@ -801,7 +801,8 @@ public IntegrationFlow bridgeFlow2() { return IntegrationFlow.from("bridgeFlow2Input") .bridge(c -> c.autoStartup(false).id("bridge")) .fixedSubscriberChannel() - .delay("delayer", d -> d + .delay(d -> d + .messageGroupId("delayer") .delayExpression("200") .advice(this.delayedAdvice) .messageStore(this.messageStore())) diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt index 0918cb1cb80..94dc725e6c5 100644 --- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt +++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt @@ -320,7 +320,10 @@ class KotlinDslTests { wireTap { channel { queue("wireTapChannel") } } - delay("delayGroup") { defaultDelay(100) } + delay { + messageGroupId("delayGroup") + defaultDelay(100) + } transform { it.uppercase() } } diff --git a/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java b/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java index 5a3a7973fc8..0c49b636781 100644 --- a/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java +++ b/spring-integration-event/src/test/java/org/springframework/integration/event/dsl/IntegrationFlowEventsTests.java @@ -184,7 +184,8 @@ public IntegrationFlow eventProducerFlow() { @Bean public IntegrationFlow delayFlow() { return flow -> flow - .delay(GROUP_ID, e -> e + .delay(e -> e + .messageGroupId(GROUP_ID) .messageStore(messageGroupStore) .id("delayer")) .channel(MessageChannels.queue("delayedResults")); diff --git a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy index 648b6cabaed..423f82a2cd2 100644 --- a/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy +++ b/spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy @@ -618,7 +618,9 @@ class GroovyIntegrationFlowDefinition { * {@link org.springframework.integration.store.MessageGroupStore}. * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. * @see org.springframework.integration.dsl.DelayerEndpointSpec + * @deprecated since 6.2 in favor of {@link #delay(groovy.lang.Closure)} */ + @Deprecated(since = "6.2", forRemoval = true) GroovyIntegrationFlowDefinition delay( String groupId, @DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST) @@ -629,6 +631,21 @@ class GroovyIntegrationFlowDefinition { this } + /** + * Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position. + * @param endpointConfigurer the {@link Consumer} to provide integration endpoint options. + * @see org.springframework.integration.dsl.DelayerEndpointSpec + * @since 6.2 + */ + GroovyIntegrationFlowDefinition delay( + @DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.DelayerEndpointSpec') + Closure endpointConfigurer) { + + this.delegate.delay createConfigurerIfAny(endpointConfigurer) + this + } + /** * Populate a {@link org.springframework.integration.transformer.ContentEnricher} * to the current integration flow position @@ -657,7 +674,7 @@ class GroovyIntegrationFlowDefinition { GroovyIntegrationFlowDefinition enrichHeaders( @DelegatesTo(value = HeaderEnricherSpec, strategy = Closure.DELEGATE_FIRST) @ClosureParams(value = SimpleType.class, options = 'org.springframework.integration.dsl.HeaderEnricherSpec') - Closure enricherConfigurer = null) { + Closure enricherConfigurer) { this.delegate.enrichHeaders createConfigurerIfAny(enricherConfigurer) this diff --git a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy index 799fb08d522..9b3c671c581 100644 --- a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy +++ b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy @@ -314,7 +314,10 @@ class GroovyDslTests { wireTap integrationFlow { channel { queue 'wireTapChannel' } } - delay 'delayGroup', { defaultDelay 100 } + delay { + messageGroupId 'delayGroup' + defaultDelay 100 + } transform String, { it.toUpperCase() } } } diff --git a/src/reference/asciidoc/delayer.adoc b/src/reference/asciidoc/delayer.adoc index 437d21d35b4..a9aa8288665 100644 --- a/src/reference/asciidoc/delayer.adoc +++ b/src/reference/asciidoc/delayer.adoc @@ -33,7 +33,8 @@ If you need to determine the delay for each message, you can also provide the Sp @Bean public IntegrationFlow flow() { return IntegrationFlow.from("input") - .delay("delayer.messageGroupId", d -> d + .delay(d -> d + .messageGroupId("delayer.messageGroupId") .defaultDelay(3_000L) .delayExpression("headers['delay']")) .channel("output") @@ -46,7 +47,8 @@ public IntegrationFlow flow() { @Bean fun flow() = integrationFlow("input") { - delay("delayer.messageGroupId") { + delay { + messageGroupId("delayer.messageGroupId") defaultDelay(3000L) delayExpression("headers['delay']") } diff --git a/src/reference/asciidoc/groovy-dsl.adoc b/src/reference/asciidoc/groovy-dsl.adoc index 26ce8d73163..c9a9b57d28e 100644 --- a/src/reference/asciidoc/groovy-dsl.adoc +++ b/src/reference/asciidoc/groovy-dsl.adoc @@ -35,7 +35,10 @@ flowLambda() { wireTap integrationFlow { channel { queue 'wireTapChannel' } } - delay 'delayGroup', { defaultDelay 100 } + delay { + messageGroupId 'delayGroup' + defaultDelay 100 + } transform String, { it.toUpperCase() } } }