Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,34 +16,20 @@

package org.springframework.integration.aop;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;

/**
* Advice for a {@link MessageSource#receive()} method to decide whether a poll
* should be ignored and/or take action after the receive.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.2
*
* @deprecated since 5.3 in favor of {@link MessageSourceMutator}.
*/
public abstract class AbstractMessageSourceAdvice implements MethodInterceptor, MessageSourceMutator {

@Override
public final Object invoke(MethodInvocation invocation) throws Throwable {
Object target = invocation.getThis();
if (!(target instanceof MessageSource)) {
return invocation.proceed();
}

Message<?> result = null;
if (beforeReceive((MessageSource<?>) target)) {
result = (Message<?>) invocation.proceed();
}
return afterReceive(result, (MessageSource<?>) target);
}
@Deprecated
public abstract class AbstractMessageSourceAdvice implements MessageSourceMutator {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.springframework.integration.core.MessageSource;
import org.springframework.integration.util.CompoundTrigger;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.scheduling.Trigger;
import org.springframework.util.Assert;
Expand All @@ -35,7 +36,10 @@
* @since 4.3
*
*/
public class CompoundTriggerAdvice extends AbstractMessageSourceAdvice {
@SuppressWarnings("deprecation")
public class CompoundTriggerAdvice
extends AbstractMessageSourceAdvice
implements ReceiveMessageAdvice {

private final CompoundTrigger compoundTrigger;

Expand All @@ -47,8 +51,21 @@ public CompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTr
this.override = overrideTrigger;
}

/**
* @param result the received message.
* @param source the message source.
* @return the message or null
* @deprecated since 5.3 in favor of {@link #afterReceive(Message, Object)}
*/
@Override
@Deprecated
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
return afterReceive(result, (Object) source);
}

@Override
@Nullable
public Message<?> afterReceive(@Nullable Message<?> result, Object source) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't something like Advisable<T> be better than Object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in most cases it is really about both MessageSource and PollableChannel, so even with generic argument it still going to be an Object 😄

I thought about a Supplier for both those interfaces, but this is not what we are going to use with the receive(timeout).
Therefore I'm OK to stick with an Object for a source argument.

if (result == null) {
this.compoundTrigger.setOverride(this.override);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,19 +17,31 @@
package org.springframework.integration.aop;

import org.springframework.integration.core.MessageSource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

/**
* An object that can mutate a {@link MessageSource} before and/or after
* A {@link ReceiveMessageAdvice} extension that can mutate a {@link MessageSource} before and/or after
* {@link MessageSource#receive()} is called.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 5.0.7.
*
* @since 5.0.7
*/
@FunctionalInterface
public interface MessageSourceMutator {
public interface MessageSourceMutator extends ReceiveMessageAdvice {

@Override
default boolean beforeReceive(Object source) {
if (source instanceof MessageSource<?>) {
return beforeReceive((MessageSource<?>) source);
}
else {
throw new IllegalArgumentException(
"The 'MessageSourceMutator' supports only a 'MessageSource' in the before/after hooks: " + source);
}
}

/**
* Subclasses can decide whether to proceed with this poll.
Expand All @@ -40,13 +52,26 @@ default boolean beforeReceive(MessageSource<?> source) {
return true;
}

@Override
@Nullable
default Message<?> afterReceive(@Nullable Message<?> result, Object source) {
if (source instanceof MessageSource<?>) {
return afterReceive(result, (MessageSource<?>) source);
}
else {
throw new IllegalArgumentException(
"The 'MessageSourceMutator' supports only a 'MessageSource' in the before/after hooks: " + source);
}
}

/**
* Subclasses can take actions based on the result of the poll; e.g.
* adjust the {@code trigger}. The message can also be replaced with a new one.
* @param result the received message.
* @param source the message source.
* @return a message to continue to process the result, null to discard whatever the poll returned.
*/
Message<?> afterReceive(Message<?> result, MessageSource<?> source);
@Nullable
Message<?> afterReceive(@Nullable Message<?> result, MessageSource<?> source);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aop;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

import org.springframework.integration.core.MessageSource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

/**
* An AOP advice to perform hooks before and/or after a {@code receive()} contract is called.
*
* @author Artem Bilan
*
* @since 5.3
*/
@FunctionalInterface
public interface ReceiveMessageAdvice extends MethodInterceptor {

/**
* Subclasses can decide whether to {@link MethodInvocation#proceed()} or not.
* @param source the source of the message to receive.
* @return true to proceed (default).
*/
default boolean beforeReceive(Object source) {
return true;
}

@Override
@Nullable
default Object invoke(MethodInvocation invocation) throws Throwable {
Object target = invocation.getThis();
if (!(target instanceof MessageSource) && !(target instanceof PollableChannel)) {
return invocation.proceed();
}

Message<?> result = null;
if (beforeReceive(target)) {
result = (Message<?>) invocation.proceed();
}
return afterReceive(result, target);
}

/**
* Subclasses can take actions based on the result of the {@link MethodInvocation#proceed()}; e.g.
* adjust the {@code trigger}. The message can also be replaced with a new one.
* @param result the received message.
* @param source the source of the message to receive.
* @return a message to continue to process the result, null to discard whatever
* the {@link MethodInvocation#proceed()} returned.
*/
@Nullable
Message<?> afterReceive(@Nullable Message<?> result, Object source);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,11 @@
* @since 4.2
*
* @see DynamicPeriodicTrigger
*
* @deprecated since 5.3 in favor of {@link SimpleActiveIdleReceiveMessageAdvice} with the same
* (but more common) functionality.
*/
@Deprecated
public class SimpleActiveIdleMessageSourceAdvice extends AbstractMessageSourceAdvice {

private final DynamicPeriodicTrigger trigger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aop;

import java.time.Duration;

import org.springframework.integration.util.DynamicPeriodicTrigger;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/**
A simple advice that polls at one rate when messages exist and another when
* there are no messages.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 5.3
*
* @see DynamicPeriodicTrigger
*/
public class SimpleActiveIdleReceiveMessageAdvice implements ReceiveMessageAdvice {

private final DynamicPeriodicTrigger trigger;

private volatile Duration idlePollPeriod;

private volatile Duration activePollPeriod;

public SimpleActiveIdleReceiveMessageAdvice(DynamicPeriodicTrigger trigger) {
Assert.notNull(trigger, "'trigger' must not be null");
this.trigger = trigger;
this.idlePollPeriod = trigger.getDuration();
this.activePollPeriod = trigger.getDuration();
}

/**
* Set the poll period when messages are not returned. Defaults to the
* trigger's period.
* @param idlePollPeriod the period in milliseconds.
*/
public void setIdlePollPeriod(long idlePollPeriod) {
this.idlePollPeriod = Duration.ofMillis(idlePollPeriod);
}

/**
* Set the poll period when messages are returned. Defaults to the
* trigger's period.
* @param activePollPeriod the period in milliseconds.
*/
public void setActivePollPeriod(long activePollPeriod) {
this.activePollPeriod = Duration.ofMillis(activePollPeriod);
}

@Override
public Message<?> afterReceive(Message<?> result, Object source) {
if (result == null) {
this.trigger.setDuration(this.idlePollPeriod);
}
else {
this.trigger.setDuration(this.activePollPeriod);
}
return result;
}

}
Loading