Skip to content

Commit

Permalink
GH-2880: Handle Pausable in Control Bus (#2940)
Browse files Browse the repository at this point in the history
* GH-2880: Handle `Pausable` in Control Bus

Fixes #2880

* Refactor `ControlBusMethodFilter` to handle `Pausable` managed operations
* Optimize and internal `ControlBusMethodFilter.filter()` logic to rely
on the `MergedAnnotations`
* Modify `EnableIntegrationTests` to test new functionality and document
the feature

* * `ControlBusMethodFilter` to deal with plain `Lifecycle` impls as well
  • Loading branch information
artembilan authored and garyrussell committed May 30, 2019
1 parent e9591c6 commit 374b4b7
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package org.springframework.integration.expression;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

import org.springframework.context.Lifecycle;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.AnnotationFilter;
import org.springframework.core.annotation.MergedAnnotations;
import org.springframework.core.annotation.RepeatableContainers;
import org.springframework.expression.MethodFilter;
import org.springframework.integration.endpoint.Pausable;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.util.CustomizableThreadCreator;
Expand All @@ -32,22 +34,23 @@
/**
* SpEL {@link MethodFilter} to restrict method invocations to:
* <ul>
* <li> {@link Lifecycle} components
* <li> {@link Pausable} or {@link Lifecycle} components
* <li> {@code get}, {@code set} and {@code shutdown} methods of {@link CustomizableThreadCreator}
* <li> methods with {@link ManagedAttribute} and {@link ManagedOperation} annotations
* </ul>
* This class isn't designed for target applications and typically is used from {@code ExpressionControlBusFactoryBean}.
*
* @author Mark Fisher
* @author Artem Bilan
* @since 4.0
*/
*
* @since 4.0
*/
public class ControlBusMethodFilter implements MethodFilter {

public List<Method> filter(List<Method> methods) {
List<Method> supportedMethods = new ArrayList<Method>();
List<Method> supportedMethods = new ArrayList<>();
for (Method method : methods) {
if (this.accept(method)) {
if (accept(method)) {
supportedMethods.add(method);
}
}
Expand All @@ -56,23 +59,25 @@ public List<Method> filter(List<Method> methods) {

private boolean accept(Method method) {
Class<?> declaringClass = method.getDeclaringClass();
if (Lifecycle.class.isAssignableFrom(declaringClass)
&& ReflectionUtils.findMethod(Lifecycle.class, method.getName(), method.getParameterTypes()) != null) {
String methodName = method.getName();
if ((Pausable.class.isAssignableFrom(declaringClass) || Lifecycle.class.isAssignableFrom(declaringClass))
&& ReflectionUtils.findMethod(Pausable.class, methodName, method.getParameterTypes()) != null) {
return true;
}

if (CustomizableThreadCreator.class.isAssignableFrom(declaringClass)
&& (method.getName().startsWith("get")
|| method.getName().startsWith("set")
|| method.getName().startsWith("shutdown"))) {
return true;
}
if (this.hasAnnotation(method, ManagedAttribute.class) || this.hasAnnotation(method, ManagedOperation.class)) {
&& (methodName.startsWith("get")
|| methodName.startsWith("set")
|| methodName.startsWith("shutdown"))) {
return true;
}
return false;
}

private boolean hasAnnotation(Method method, Class<? extends Annotation> annotationType) {
return AnnotationUtils.findAnnotation(method, annotationType) != null;
MergedAnnotations mergedAnnotations =
MergedAnnotations.from(method, MergedAnnotations.SearchStrategy.EXHAUSTIVE,
RepeatableContainers.none(), AnnotationFilter.PLAIN);

return mergedAnnotations.get(ManagedAttribute.class).isPresent()
|| mergedAnnotations.get(ManagedOperation.class).isPresent();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
import org.springframework.integration.endpoint.Pausable;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.expression.SpelPropertyAccessorRegistrar;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
Expand Down Expand Up @@ -409,11 +410,16 @@ public void testAnnotatedServiceActivator() throws Exception {
assertThat(message.getHeaders().get("foo")).isEqualTo("FOO");

MessagingTemplate messagingTemplate = new MessagingTemplate(this.controlBusChannel);
assertThat(messagingTemplate.convertSendAndReceive("@lifecycle.isRunning()", Boolean.class)).isEqualTo(false);
this.controlBusChannel.send(new GenericMessage<>("@lifecycle.start()"));
assertThat(messagingTemplate.convertSendAndReceive("@lifecycle.isRunning()", Boolean.class)).isEqualTo(true);
this.controlBusChannel.send(new GenericMessage<>("@lifecycle.stop()"));
assertThat(messagingTemplate.convertSendAndReceive("@lifecycle.isRunning()", Boolean.class)).isEqualTo(false);
assertThat(messagingTemplate.convertSendAndReceive("@pausable.isRunning()", Boolean.class)).isEqualTo(false);
this.controlBusChannel.send(new GenericMessage<>("@pausable.start()"));
assertThat(messagingTemplate.convertSendAndReceive("@pausable.isRunning()", Boolean.class)).isEqualTo(true);
this.controlBusChannel.send(new GenericMessage<>("@pausable.stop()"));
assertThat(messagingTemplate.convertSendAndReceive("@pausable.isRunning()", Boolean.class)).isEqualTo(false);
this.controlBusChannel.send(new GenericMessage<>("@pausable.pause()"));
Object pausable = this.context.getBean("pausable");
assertThat(TestUtils.getPropertyValue(pausable, "paused", Boolean.class)).isTrue();
this.controlBusChannel.send(new GenericMessage<>("@pausable.resume()"));
assertThat(TestUtils.getPropertyValue(pausable, "paused", Boolean.class)).isFalse();

Map<String, ServiceActivatingHandler> beansOfType =
this.context.getBeansOfType(ServiceActivatingHandler.class);
Expand Down Expand Up @@ -638,7 +644,7 @@ public void testBridgeAnnotations() {
@Test
public void testMonoGateway() throws Exception {

final AtomicReference<List<Integer>> ref = new AtomicReference<List<Integer>>();
final AtomicReference<List<Integer>> ref = new AtomicReference<>();
final CountDownLatch consumeLatch = new CountDownLatch(1);

Flux.just("1", "2", "3", "4", "5")
Expand Down Expand Up @@ -1035,11 +1041,13 @@ public ExpressionControlBusFactoryBean controlBus() {
}

@Bean
public Lifecycle lifecycle() {
return new Lifecycle() {
public Pausable pausable() {
return new Pausable() {

private volatile boolean running;

private volatile boolean paused;

@Override
public void start() {
this.running = true;
Expand All @@ -1055,6 +1063,16 @@ public boolean isRunning() {
return this.running;
}

@Override
public void pause() {
this.paused = true;
}

@Override
public void resume() {
this.paused = false;
}

};
}

Expand Down
2 changes: 1 addition & 1 deletion src/reference/asciidoc/control-bus.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ For example, you can specify an output channel if the result of the operation ha
The control bus runs messages on the input channel as Spring Expression Language (SpEL) expressions.
It takes a message, compiles the body to an expression, adds some context, and then runs it.
The default context supports any method that has been annotated with `@ManagedAttribute` or `@ManagedOperation`.
It also supports the methods on Spring's `Lifecycle` interface, and it supports methods that are used to configure several of Spring's `TaskExecutor` and `TaskScheduler` implementations.
It also supports the methods on Spring's `Lifecycle` interface (and its `Pausable` extension since version 5.2), and it supports methods that are used to configure several of Spring's `TaskExecutor` and `TaskScheduler` implementations.
The simplest way to ensure that your own methods are available to the control bus is to use the `@ManagedAttribute` or `@ManagedOperation` annotations.
Since those annotations are also used for exposing methods to a JMX MBean registry, they offer a convenient by-product: Often, the same types of operations you want to expose to the control bus are reasonable for exposing through JMX).
Resolution of any particular instance within the application context is achieved in the typical SpEL syntax.
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ See <<./transformer.adoc#json-transformers,JSON Transformers>> for more informat
The `splitter` now supports a `discardChannel` configuration option.
See <<./splitter.adoc#splitter,Splitter>> for more information.

The Control Bus can now handle `Pausable` (extension of `Lifecycle`) operations.
See <<control-bus>> for more information.

[[x5.2-amqp]]
==== AMQP Changes

Expand Down

0 comments on commit 374b4b7

Please sign in to comment.