Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

INT-2485 Orderly Shutdown #403

Closed
wants to merge 4 commits into from

2 participants

@garyrussell
Owner

https://jira.springsource.org/browse/INT-2485

Initial commit - @ManagedOperation on IMBE

  • can be invoked via JMX, , or getting a reference to the IMBE from the application context.

INT-2485 Updates After Review Comments (JIRA)

  • Shutdown Schedulers first, and wait for them
  • Add a force parameter, which overrides thread pool shutdown options
  • Shutdown Sources/Channels after all thread pools have stopped
  • Mark other components as OrderlyShutdownCapable (e.g. JMS/AMQP Listener containers) and shut them down first
  • Wait for remaining time to allow for quiescence

Also

  • remove TimeUnit parameter (not JMX-friendly); time limit is now always milliseconds
  • If thread pools don't stop in time limit, force them down.

INT-2485 Handle Self-Destruction

Add shutdown-executor to IMBE.

When the shutdown was called on a Spring-Managed thread, the shutdown
was not clean because we timed out waiting for the current thread to
terminate. After that, we force terminated other components.

Now, by providing a dedicated Executor for the shutdown process, it
is used for the shutdown instead of the current thread. This Executor
is not shutdown.

It is not necessary to provide an Executor if the stopActiveComponents()
method is called on some other thread that is not involved in the
shutdown.

Also adds executor name to logs, when available.

INT-2485 Polishing

Fix MBean object name collision when running all tests.

INT-2485 Enable TCP Shutdown

Make TCP connection factories 'OrderlyShutdownCapable' so
they are stopped before schedulers/executors in order for
them to release any executor threads they are holding.

...ork/integration/monitor/IntegrationMBeanExporter.java
@@ -191,6 +220,15 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ListableBeanFactory) beanFactory;
}
+ public void setApplicationContext(ApplicationContext applicationContext)
+ throws BeansException {
+ this.applicationContext = applicationContext;
@olegz
olegz added a note

null check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ork/integration/monitor/IntegrationMBeanExporter.java
@@ -191,6 +220,15 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ListableBeanFactory) beanFactory;
}
+ public void setApplicationContext(ApplicationContext applicationContext)
+ throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ public void setShutdownExecutor(Executor shutdownExecutor) {
+ this.shutdownExecutor = shutdownExecutor;
@olegz
olegz added a note

null check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@olegz olegz commented on the diff
...ork/integration/monitor/IntegrationMBeanExporter.java
((18 lines not shown))
+ * running tasks. Overrides any settings on schedulers/executors. When true
+ * may result in error messages being sent to error channels.
+ * @param howLong The time to wait in total for all activities to complete
+ * in milliseconds.
+ */
+ @ManagedOperation
+ public void stopActiveComponents(boolean force, long howLong) {
+ if (!this.shuttingDown.compareAndSet(false, true)) {
+ logger.error("Shutdown already in process");
+ return;
+ }
+ this.shutdownDeadline = System.currentTimeMillis() + howLong;
+ this.shutdownForced = force;
+ if (this.shutdownExecutor == null) {
+ try {
+ logger.debug("Running shutdown on current thread");
@olegz
olegz added a note

We should do if(logger.isDebugEnabled)

@garyrussell Owner

That's only needed if when concatenating strings in the logger.debug(); when it's a simple string, it's ok (the first thing logger.debug() does is isDebugEnabled().

The reason we do it when concatenating is to avoid the overhead of the concatentation if isDebugEnabled() is false. In this case, the extra call to isDebugEnabled() is overhead.

@olegz
olegz added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ork/integration/monitor/IntegrationMBeanExporter.java
((148 lines not shown))
+ }
+
+ @ManagedOperation
+ public void stopExecutors() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping executors" + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ThreadPoolTaskExecutor> executors = this.applicationContext
+ .getBeansOfType(ThreadPoolTaskExecutor.class);
+ for (Entry<String, ThreadPoolTaskExecutor> entry : executors.entrySet()) {
+ ThreadPoolTaskExecutor executor = entry.getValue();
+ if (executor == this.shutdownExecutor) {
+ logger.debug("Skipping shutdown of shutdown executor");
+ continue;
+ }
@olegz
olegz added a note

I'll leave it up to you but wanted to say that I am not a big fan of 'continue'. Makes it hard to follow. May be something like this:

if (executor != this.shutdownExecutor) {
                if (logger.isInfoEnabled()) {
                    logger.info("Stopping executor " + executor.getThreadNamePrefix());
                }
                ExecutorService executorService = executor.getThreadPoolExecutor();
                executorServices.add(executorService);
                if (this.shutdownForced) {
                    executorService.shutdownNow();
                }
                else {
                    executorService.shutdown();
                }
            }
            else {
                logger.debug("Skipping shutdown of shutdown executor");
            }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ork/integration/monitor/IntegrationMBeanExporter.java
((184 lines not shown))
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ExecutorService> nonSpringExecutors = this.applicationContext
+ .getBeansOfType(ExecutorService.class);
+ for (Entry<String, ExecutorService> entry : nonSpringExecutors.entrySet()) {
+ ExecutorService executorService = entry.getValue();
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping executor service " + executorService);
+ }
+ executorServices.add(executorService);
+ if (this.shutdownForced) {
+ executorService.shutdownNow();
+ }
+ else {
+ executorService.shutdown();
+ }
@olegz
olegz added a note

seems like this block of code is the same in all 3 methods. May be another private method?

executorServices.add(executorService);
            if (this.shutdownForced) {
                executorService.shutdownNow();
            }
            else {
                executorService.shutdown();
            }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ork/integration/monitor/IntegrationMBeanExporter.java
((190 lines not shown))
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping executor service " + executorService);
+ }
+ executorServices.add(executorService);
+ if (this.shutdownForced) {
+ executorService.shutdownNow();
+ }
+ else {
+ executorService.shutdown();
+ }
+ }
+ waitForExecutors(executorServices);
+ logger.debug("Stopped other executors");
+ }
+
+ private void waitForExecutors(List<ExecutorService> executorServices) {
@olegz
olegz added a note

Can you move this method down to where all privates are?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@olegz olegz commented on the diff
...ork/integration/monitor/IntegrationMBeanExporter.java
((142 lines not shown))
+ else {
+ executorService.shutdown();
+ }
+ }
+ waitForExecutors(executorServices);
+ logger.debug("Stopped schedulers");
+ }
+
+ @ManagedOperation
+ public void stopExecutors() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping executors" + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ThreadPoolTaskExecutor> executors = this.applicationContext
+ .getBeansOfType(ThreadPoolTaskExecutor.class);
@olegz
olegz added a note

Do we really only care about ThreadPoolTaskExecutor here? The hierarchy of TaskExecutor is pretty extensive and I am assuming you wanted to specifically stop any Spring TaskExecutor here. Also, however unlikely if the executor is a Proxy than it won't be part of the executors map

@garyrussell Owner

I was really going after and our internal TaskScheduler. I think there are issues going after everything. I'll look again, but we might have to defer to the follow up JIRA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@olegz olegz commented on the diff
...ork/integration/monitor/IntegrationMBeanExporter.java
((116 lines not shown))
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping channel " + channel);
+ }
+ ((Lifecycle) channel).stop();
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void stopSchedulers() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping schedulers " + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ThreadPoolTaskScheduler> schedulers = this.applicationContext
+ .getBeansOfType(ThreadPoolTaskScheduler.class);
@olegz
olegz added a note

Similar comment as for TaskExecutor. You probably want all instances of TaskScheduler, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@olegz olegz commented on the diff
...ork/integration/monitor/IntegrationMBeanExporter.java
((98 lines not shown))
+ }
+ ((LifecycleMessageSourceMetrics) sourceMetrics).stop();
+ }
+ else {
+ if (logger.isInfoEnabled()) {
+ logger.info("Message source " + sourceMetrics + " cannot be stopped");
+ }
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void stopActiveChannels() {
+ // Stop any "active" channels (JMS etc).
+ for (Entry<String, DirectChannelMetrics> entry : this.allChannelsByName.entrySet()) {
+ DirectChannelMetrics metrics = entry.getValue();
@olegz
olegz added a note

Same MessageChannelMetrics?

@garyrussell Owner

??

@olegz
olegz added a note

Never mind this one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ork/integration/monitor/IntegrationMBeanExporter.java
((210 lines not shown))
+ logger.error("Executor service " + executorService + " failed to terminate");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while shutting down executor service " + executorService);
+ throw new MessagingException("Interrupted while shutting down", e);
+ }
+ if (System.currentTimeMillis() > this.shutdownDeadline) {
+ logger.error("Timed out before waiting for all executor services");
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void stopOtherActiveComponents() {
+ logger.debug("Stopping OrderlyShutdownCapable components");
@olegz
olegz added a note

Should this be called stopOrderlyShtdownCapableComponents? My concern is that in line 497 its the first method that is called and other somehow implies that when everything specific is shut down you can now shut down all others. wdyt?

@garyrussell Owner

Yeah - this was originally caled last (see the JIRA comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
garyrussell added some commits
@garyrussell garyrussell INT-2485 Orderly Shutdown
Initial commit - @ManagedOperation on IMBE

- can be invoked via JMX, <control-bus/>, or getting a reference to
the IMBE from the application context.

INT-2485 Updates After Review Comments (JIRA)

* Shutdown Schedulers first, and wait for them
* Add a force parameter, which overrides thread pool shutdown options
* Shutdown Sources/Channels after all thread pools have stopped
* Mark other components as OrderlyShutdownCapable (e.g. JMS/AMQP Listener containers) and shut them down first
* Wait for remaining time to allow for quiescence

Also
* remove TimeUnit parameter (not JMX-friendly); time limit is now always milliseconds
* If thread pools don't stop in time limit, force them down.

INT-2485 Handle Self-Destruction

Add shutdown-executor to IMBE.

When the shutdown was called on a Spring-Managed thread, the shutdown
was not clean because we timed out waiting for the current thread to
terminate. After that, we force terminated other components.

Now, by providing a dedicated Executor for the shutdown process, it
is used for the shutdown instead of the current thread. This Executor
is *not* shutdown.

It is not necessary to provide an Executor if the stopActiveComponents()
method is called on some other thread that is not involved in the
shutdown.

Also adds executor name to logs, when available.

INT-2485 Polishing

Fix MBean object name collision when running all tests.

INT-2485 Enable TCP Shutdown

Make TCP connection factories 'OrderlyShutdownCapable' so
they are stopped before schedulers/executors in order for
them to release any executor threads they are holding.
8b370d0
@garyrussell garyrussell INT-2485 Polishing
Didn't need DirectFieldAccessor - scheduler and executor have
an accessor for the native ExecutorService.

Copyrights
7f37c4f
@garyrussell garyrussell INT-2485 Polishing
schemaLocation version.
356ef0b
@garyrussell garyrussell INT-2485 PR Review Polishing cb7b183
@garyrussell
Owner

Pushed PR Review Fixes; see last commit

@olegz

Looks good, merging

@olegz olegz closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 8, 2012
  1. @garyrussell

    INT-2485 Orderly Shutdown

    garyrussell authored
    Initial commit - @ManagedOperation on IMBE
    
    - can be invoked via JMX, <control-bus/>, or getting a reference to
    the IMBE from the application context.
    
    INT-2485 Updates After Review Comments (JIRA)
    
    * Shutdown Schedulers first, and wait for them
    * Add a force parameter, which overrides thread pool shutdown options
    * Shutdown Sources/Channels after all thread pools have stopped
    * Mark other components as OrderlyShutdownCapable (e.g. JMS/AMQP Listener containers) and shut them down first
    * Wait for remaining time to allow for quiescence
    
    Also
    * remove TimeUnit parameter (not JMX-friendly); time limit is now always milliseconds
    * If thread pools don't stop in time limit, force them down.
    
    INT-2485 Handle Self-Destruction
    
    Add shutdown-executor to IMBE.
    
    When the shutdown was called on a Spring-Managed thread, the shutdown
    was not clean because we timed out waiting for the current thread to
    terminate. After that, we force terminated other components.
    
    Now, by providing a dedicated Executor for the shutdown process, it
    is used for the shutdown instead of the current thread. This Executor
    is *not* shutdown.
    
    It is not necessary to provide an Executor if the stopActiveComponents()
    method is called on some other thread that is not involved in the
    shutdown.
    
    Also adds executor name to logs, when available.
    
    INT-2485 Polishing
    
    Fix MBean object name collision when running all tests.
    
    INT-2485 Enable TCP Shutdown
    
    Make TCP connection factories 'OrderlyShutdownCapable' so
    they are stopped before schedulers/executors in order for
    them to release any executor threads they are holding.
  2. @garyrussell

    INT-2485 Polishing

    garyrussell authored
    Didn't need DirectFieldAccessor - scheduler and executor have
    an accessor for the native ExecutorService.
    
    Copyrights
  3. @garyrussell

    INT-2485 Polishing

    garyrussell authored
    schemaLocation version.
Commits on May 9, 2012
  1. @garyrussell
This page is out of date. Refresh to see the latest.
View
7 ...src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2011 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
+import org.springframework.integration.core.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -34,9 +35,11 @@
* Spring Integration Messages, and sends the results to a Message Channel.
*
* @author Mark Fisher
+ * @author Gary Russell
* @since 2.1
*/
-public class AmqpInboundChannelAdapter extends MessageProducerSupport {
+public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
+ OrderlyShutdownCapable {
private final AbstractMessageListenerContainer messageListenerContainer;
View
30 ...ation-core/src/main/java/org/springframework/integration/core/OrderlyShutdownCapable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.core;
+
+/**
+ * Marker interface for components that wish to be considered for
+ * an orderly shutdown using management interfaces. Components that
+ * implement this interface will be stopped before schedulers,
+ * executors etc, in order for them to free up any execution
+ * threads they may be holding.
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public interface OrderlyShutdownCapable {
+
+}
View
3  ...ain/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java
@@ -40,6 +40,7 @@
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.MessagingException;
import org.springframework.integration.context.IntegrationObjectSupport;
+import org.springframework.integration.core.OrderlyShutdownCapable;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.util.Assert;
@@ -51,7 +52,7 @@
*
*/
public abstract class AbstractConnectionFactory extends IntegrationObjectSupport
- implements ConnectionFactory, Runnable, SmartLifecycle {
+ implements ConnectionFactory, Runnable, SmartLifecycle, OrderlyShutdownCapable {
protected static final int DEFAULT_REPLY_TIMEOUT = 10000;
View
7 ...ation-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2011 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.integration.jms;
import org.springframework.beans.factory.DisposableBean;
+import org.springframework.integration.core.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.util.Assert;
@@ -27,8 +28,10 @@
*
* @author Mark Fisher
* @author Oleg Zhurakousky
+ * @author Gary Russell
*/
-public class JmsMessageDrivenEndpoint extends AbstractEndpoint implements DisposableBean {
+public class JmsMessageDrivenEndpoint extends AbstractEndpoint implements
+ DisposableBean, OrderlyShutdownCapable {
private final AbstractMessageListenerContainer listenerContainer;
View
3  ...ion-jmx/src/main/java/org/springframework/integration/jmx/config/MBeanExporterParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2010 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -60,6 +60,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-domain");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "object-name-static-properties");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "managed-components", "componentNamePatterns");
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "shutdown-executor");
builder.addPropertyValue("server", mbeanServer);
this.registerMBeanExporterHelper(parserContext.getRegistry());
View
287 ...n-jmx/src/main/java/org/springframework/integration/monitor/IntegrationMBeanExporter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2010 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -14,11 +14,18 @@
package org.springframework.integration.monitor;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -44,12 +51,17 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
+import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.MessageChannel;
+import org.springframework.integration.MessagingException;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessageSource;
+import org.springframework.integration.core.OrderlyShutdownCapable;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.jmx.export.MBeanExporter;
@@ -62,6 +74,8 @@
import org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler;
import org.springframework.jmx.export.naming.MetadataNamingStrategy;
import org.springframework.jmx.support.MetricType;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;
@@ -94,7 +108,7 @@
*/
@ManagedResource
public class IntegrationMBeanExporter extends MBeanExporter implements BeanPostProcessor, BeanFactoryAware,
- BeanClassLoaderAware, SmartLifecycle {
+ ApplicationContextAware, BeanClassLoaderAware, SmartLifecycle, Runnable {
private static final Log logger = LogFactory.getLog(IntegrationMBeanExporter.class);
@@ -104,6 +118,8 @@
private ListableBeanFactory beanFactory;
+ private ApplicationContext applicationContext;
+
private Map<Object, AtomicLong> anonymousHandlerCounters = new HashMap<Object, AtomicLong>();
private Map<Object, AtomicLong> anonymousSourceCounters = new HashMap<Object, AtomicLong>();
@@ -122,6 +138,12 @@
private Map<String, MessageSourceMetrics> sourcesByName = new HashMap<String, MessageSourceMetrics>();
+ private Map<String, DirectChannelMetrics> allChannelsByName = new HashMap<String, DirectChannelMetrics>();
+
+ private Map<String, MessageHandlerMetrics> allHandlersByName = new HashMap<String, MessageHandlerMetrics>();
+
+ private Map<String, MessageSourceMetrics> allSourcesByName = new HashMap<String, MessageSourceMetrics>();
+
private Map<String, String> beansByEndpointName = new HashMap<String, String>();
private ClassLoader beanClassLoader;
@@ -146,6 +168,14 @@
private String[] componentNamePatterns = { "*" };
+ private volatile Executor shutdownExecutor;
+
+ private volatile long shutdownDeadline;
+
+ private volatile boolean shutdownForced;
+
+ private final AtomicBoolean shuttingDown = new AtomicBoolean();
+
public IntegrationMBeanExporter() {
super();
// Shouldn't be necessary, but to be on the safe side...
@@ -191,6 +221,17 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ListableBeanFactory) beanFactory;
}
+ public void setApplicationContext(ApplicationContext applicationContext)
+ throws BeansException {
+ Assert.notNull(applicationContext, "ApplicationContext may not be null");
+ this.applicationContext = applicationContext;
+ }
+
+ public void setShutdownExecutor(Executor shutdownExecutor) {
+ Assert.notNull(shutdownExecutor, "Shutdown Executor may not be null");
+ this.shutdownExecutor = shutdownExecutor;
+ }
+
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Advised) {
@@ -409,6 +450,217 @@ public void destroy() {
}
}
+ /**
+ * Shutdown active components. If the thread calling this method is
+ * managed by a Spring-managed executor, you should provide a specific
+ * dedicated executor via the {@link #setShutdownExecutor(Executor))}
+ * method. When this is provided, the shutdown will be performed on one
+ * of its threads, instead of the calling thread; thus avoiding
+ * the situation where we will wait for the current thread to terminate.
+ * <p> It is not necessary to supply this executor service if the
+ * current thread will not, itself, be shutdown as a result of
+ * calling this method.
+ * <p><b>Note:</b> The supplied executor service
+ * will <b>not</b> be shut down.
+ *
+ * @param force If true, stop the executors with shutdownNow(), canceling
+ * running tasks. Overrides any settings on schedulers/executors. When true
+ * may result in error messages being sent to error channels.
+ * @param howLong The time to wait in total for all activities to complete
+ * in milliseconds.
+ */
+ @ManagedOperation
+ public void stopActiveComponents(boolean force, long howLong) {
+ if (!this.shuttingDown.compareAndSet(false, true)) {
+ logger.error("Shutdown already in process");
+ return;
+ }
+ this.shutdownDeadline = System.currentTimeMillis() + howLong;
+ this.shutdownForced = force;
+ if (this.shutdownExecutor == null) {
+ try {
+ logger.debug("Running shutdown on current thread");
@olegz
olegz added a note

We should do if(logger.isDebugEnabled)

@garyrussell Owner

That's only needed if when concatenating strings in the logger.debug(); when it's a simple string, it's ok (the first thing logger.debug() does is isDebugEnabled().

The reason we do it when concatenating is to avoid the overhead of the concatentation if isDebugEnabled() is false. In this case, the extra call to isDebugEnabled() is overhead.

@olegz
olegz added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ this.run();
+ } catch (Exception e) {
+ logger.error("Orderly shutdown failed", e);
+ }
+ }
+ else {
+ logger.debug("Launching shutdown on another thread");
+ this.shutdownExecutor.execute(this);
+ }
+ }
+
+ /**
+ * Perform orderly shutdown - called or executed from
+ * {@link #stopActiveComponents(boolean, long)}.
+ */
+ public void run() {
+ try {
+ this.stopOrderlyShutdownCapableComponents();
+ this.stopActiveChannels();
+ this.stopSchedulers();
+ if (System.currentTimeMillis() > this.shutdownDeadline) {
+ logger.error("Timed out before waiting for all schedulers to complete");
+ }
+ this.stopExecutors();
+ if (System.currentTimeMillis() > this.shutdownDeadline) {
+ logger.error("Timed out before waiting for all executors to complete");
+ }
+ this.stopNonSpringExecutors();
+ if (System.currentTimeMillis() > this.shutdownDeadline) {
+ logger.error("Timed out before waiting for all non-Spring executors to complete");
+ }
+ this.stopMessageSources();
+ // Wait any remaining time for messages to quiesce
+ long timeLeft = this.shutdownDeadline - System.currentTimeMillis();
+ if (timeLeft > 0) {
+ try {
+ Thread.sleep(timeLeft);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while waiting for quiesce");
+ }
+ }
+ else {
+ this.shutdownForced = true;
+ this.stopSchedulers();
+ this.stopExecutors();
+ this.stopNonSpringExecutors();
+ }
+ }
+ finally {
+ this.shuttingDown.set(false);
+ }
+ }
+
+ /**
+ * Stops all message sources - may cause interrupts.
+ */
+ @ManagedOperation
+ public void stopMessageSources() {
+ for (Entry<String, MessageSourceMetrics> entry : this.allSourcesByName.entrySet()) {
+ MessageSourceMetrics sourceMetrics = entry.getValue();
+ if (sourceMetrics instanceof LifecycleMessageSourceMetrics) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping message source " + sourceMetrics);
+ }
+ ((LifecycleMessageSourceMetrics) sourceMetrics).stop();
+ }
+ else {
+ if (logger.isInfoEnabled()) {
+ logger.info("Message source " + sourceMetrics + " cannot be stopped");
+ }
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void stopActiveChannels() {
+ // Stop any "active" channels (JMS etc).
+ for (Entry<String, DirectChannelMetrics> entry : this.allChannelsByName.entrySet()) {
+ DirectChannelMetrics metrics = entry.getValue();
@olegz
olegz added a note

Same MessageChannelMetrics?

@garyrussell Owner

??

@olegz
olegz added a note

Never mind this one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ MessageChannel channel = metrics.getMessageChannel();
+ if (channel instanceof Lifecycle) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping channel " + channel);
+ }
+ ((Lifecycle) channel).stop();
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void stopSchedulers() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping schedulers " + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ThreadPoolTaskScheduler> schedulers = this.applicationContext
+ .getBeansOfType(ThreadPoolTaskScheduler.class);
@olegz
olegz added a note

Similar comment as for TaskExecutor. You probably want all instances of TaskScheduler, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ for (Entry<String, ThreadPoolTaskScheduler> entry : schedulers.entrySet()) {
+ ThreadPoolTaskScheduler scheduler = entry.getValue();
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping scheduler " + scheduler.getThreadNamePrefix());
+ }
+ ExecutorService executorService = scheduler.getScheduledExecutor();
+ executorServices.add(executorService);
+ doShutdownExecutorService(executorService);
+ }
+ waitForExecutors(executorServices);
+ logger.debug("Stopped schedulers");
+ }
+
+ @ManagedOperation
+ public void stopExecutors() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping executors" + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ThreadPoolTaskExecutor> executors = this.applicationContext
+ .getBeansOfType(ThreadPoolTaskExecutor.class);
@olegz
olegz added a note

Do we really only care about ThreadPoolTaskExecutor here? The hierarchy of TaskExecutor is pretty extensive and I am assuming you wanted to specifically stop any Spring TaskExecutor here. Also, however unlikely if the executor is a Proxy than it won't be part of the executors map

@garyrussell Owner

I was really going after and our internal TaskScheduler. I think there are issues going after everything. I'll look again, but we might have to defer to the follow up JIRA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ for (Entry<String, ThreadPoolTaskExecutor> entry : executors.entrySet()) {
+ ThreadPoolTaskExecutor executor = entry.getValue();
+ if (executor == this.shutdownExecutor) {
+ logger.debug("Skipping shutdown of shutdown executor");
+ }
+ else {
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping executor " + executor.getThreadNamePrefix());
+ }
+ ExecutorService executorService = executor.getThreadPoolExecutor();
+ executorServices.add(executorService);
+ doShutdownExecutorService(executorService);
+ }
+ }
+ waitForExecutors(executorServices);
+ logger.debug("Stopped executors");
+ }
+
+ @ManagedOperation
+ public void stopNonSpringExecutors() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping other executors" + (this.shutdownForced ? "(force)" : ""));
+ }
+ List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
+ Map<String, ExecutorService> nonSpringExecutors = this.applicationContext
+ .getBeansOfType(ExecutorService.class);
+ for (Entry<String, ExecutorService> entry : nonSpringExecutors.entrySet()) {
+ ExecutorService executorService = entry.getValue();
+ if (!(executorService instanceof ExecutorServiceAdapter)) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping executor service " + executorService);
+ }
+ executorServices.add(executorService);
+ doShutdownExecutorService(executorService);
+ }
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignoring ExecutorServiceAdapter");
+ }
+ }
+ }
+ waitForExecutors(executorServices);
+ logger.debug("Stopped other executors");
+ }
+
+ @ManagedOperation
+ public void stopOrderlyShutdownCapableComponents() {
+ logger.debug("Stopping OrderlyShutdownCapable components");
+ Map<String, OrderlyShutdownCapable> candidates = this.applicationContext
+ .getBeansOfType(OrderlyShutdownCapable.class);
+ for (Entry<String, OrderlyShutdownCapable> candidateEntry : candidates.entrySet()) {
+ OrderlyShutdownCapable candidate = candidateEntry.getValue();
+ if (candidate instanceof Lifecycle) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Stopping component " + candidate);
+ }
+ ((Lifecycle) candidate).stop();
+ }
+ }
+ logger.debug("Stopped OrderlyShutdownCapable components");
+ }
+
@ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageChannel Channel Count")
public int getChannelCount() {
return channelsByName.size();
@@ -500,9 +752,37 @@ protected void registerBeans() {
}
}
+ private void doShutdownExecutorService(ExecutorService executorService) {
+ if (this.shutdownForced) {
+ executorService.shutdownNow();
+ }
+ else {
+ executorService.shutdown();
+ }
+ }
+
+ private void waitForExecutors(List<ExecutorService> executorServices) {
+ for (ExecutorService executorService : executorServices) {
+ try {
+ if (!executorService.awaitTermination(this.shutdownDeadline
+ - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
+ logger.error("Executor service " + executorService + " failed to terminate");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while shutting down executor service " + executorService);
+ throw new MessagingException("Interrupted while shutting down", e);
+ }
+ if (System.currentTimeMillis() > this.shutdownDeadline) {
+ logger.error("Timed out before waiting for all executor services");
+ }
+ }
+ }
+
private void registerChannels() {
for (DirectChannelMetrics monitor : channels) {
String name = monitor.getName();
+ this.allChannelsByName.put(name, monitor);
if (!PatternMatchUtils.simpleMatch(this.componentNamePatterns, name)) {
continue;
}
@@ -528,6 +808,7 @@ private void registerHandlers() {
for (SimpleMessageHandlerMetrics source : handlers) {
MessageHandlerMetrics monitor = enhanceHandlerMonitor(source);
String name = monitor.getName();
+ this.allHandlersByName.put(name, monitor);
if (!PatternMatchUtils.simpleMatch(this.componentNamePatterns, name)) {
continue;
}
@@ -552,6 +833,7 @@ private void registerSources() {
for (SimpleMessageSourceMetrics source : sources) {
MessageSourceMetrics monitor = enhanceSourceMonitor(source);
String name = monitor.getName();
+ this.allSourcesByName.put(name, monitor);
if (!PatternMatchUtils.simpleMatch(this.componentNamePatterns, name)) {
continue;
}
@@ -901,5 +1183,4 @@ private static Object getField(Object target, String name) {
ReflectionUtils.makeAccessible(field);
return ReflectionUtils.getField(field, target);
}
-
}
View
3  spring-integration-jmx/src/main/resources/META-INF/spring.schemas
@@ -1,3 +1,4 @@
http\://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.0.xsd=org/springframework/integration/jmx/config/spring-integration-jmx-2.0.xsd
http\://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.1.xsd=org/springframework/integration/jmx/config/spring-integration-jmx-2.1.xsd
-http\://www.springframework.org/schema/integration/jmx/spring-integration-jmx.xsd=org/springframework/integration/jmx/config/spring-integration-jmx-2.1.xsd
+http\://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.2.xsd=org/springframework/integration/jmx/config/spring-integration-jmx-2.2.xsd
+http\://www.springframework.org/schema/integration/jmx/spring-integration-jmx.xsd=org/springframework/integration/jmx/config/spring-integration-jmx-2.2.xsd
View
202 .../main/resources/org/springframework/integration/jmx/config/spring-integration-jmx-2.2.xsd
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<xsd:schema xmlns="http://www.springframework.org/schema/integration/jmx" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:beans="http://www.springframework.org/schema/beans" xmlns:tool="http://www.springframework.org/schema/tool"
+ xmlns:integration="http://www.springframework.org/schema/integration" targetNamespace="http://www.springframework.org/schema/integration/jmx"
+ elementFormDefault="qualified" attributeFormDefault="unqualified">
+
+ <xsd:import namespace="http://www.springframework.org/schema/beans" />
+ <xsd:import namespace="http://www.springframework.org/schema/tool" />
+ <xsd:import namespace="http://www.springframework.org/schema/integration" schemaLocation="http://www.springframework.org/schema/integration/spring-integration-2.2.xsd" />
+
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Defines the configuration elements for Spring Integration's JMX adapters.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+
+ <xsd:element name="attribute-polling-channel-adapter">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines an inbound Channel Adapter that polls for JMX attribute values.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="adapterType">
+ <xsd:sequence minOccurs="0" maxOccurs="1">
+ <xsd:element ref="integration:poller" />
+ </xsd:sequence>
+ <xsd:attribute name="attribute-name" type="xsd:string" use="required" />
+ <xsd:attribute name="auto-startup" type="xsd:string" default="true" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="operation-invoking-outbound-gateway">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines an outbound Gateway which allows for Message-driven invocation of managed operations that
+ return values
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="operationInvokingType">
+ <xsd:attribute name="request-channel" type="xsd:string" use="required" />
+ <xsd:attribute name="reply-channel" type="xsd:string" use="optional" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="operation-invoking-channel-adapter">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines an outbound Channel Adapter for invoking JMX operations.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="operationInvokingType">
+ <xsd:attribute name="channel" type="xsd:string" use="optional" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="notification-listening-channel-adapter">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines an inbound Channel Adapter that listens for JMX notifications.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="adapterType">
+ <xsd:attribute name="notification-filter" type="xsd:string" use="optional" />
+ <xsd:attribute name="handback" type="xsd:string" use="optional" />
+ <xsd:attribute name="send-timeout" type="xsd:string" use="optional" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="notification-publishing-channel-adapter">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines an outbound Channel Adapter that publishes JMX notifications.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="adapterType">
+ <xsd:attribute name="default-notification-type" type="xsd:string" use="optional" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="mbean-export">
+ <xsd:annotation>
+ <xsd:documentation>
+ Exports Message Channels and Endpoints as MBeans.
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:complexContent>
+ <xsd:extension base="mbeanServerIdentifyerType">
+ <xsd:attribute name="default-domain" use="optional">
+ <xsd:annotation>
+ <xsd:documentation>
+ The domain name for the MBeans exported by this Exporter.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="object-name-static-properties" use="optional">
+ <xsd:annotation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="java.util.Properties" />
+ </tool:annotation>
+ </xsd:appinfo>
+ <xsd:documentation>
+ Static object properties to be used for this domain. These properties are appended to
+ the ObjectName of all MBeans registered by this component.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="managed-components" use="optional">
+ <xsd:annotation>
+ <xsd:documentation>
+ Comma separated list of simple patterns for component names to register (defaults to '*').
+ The pattern is applied to all components before they are registered, looking for a match on
+ the 'name' property of the ObjectName. A MessageChannel and a MessageHandler (for instance)
+ can share a name because they have a different type, so in that case they would either both
+ be included or both excluded.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="shutdown-executor" use="optional">
+ <xsd:annotation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="java.util.concurrent.Executor" />
+ </tool:annotation>
+ </xsd:appinfo>
+ <xsd:documentation>
+ An Executor used when shutting down the application using the 'stopActiveComponents()'
+ method. Only required when invoking the operation on a Spring-managed thread, such as
+ via a <control-bus/> from, say, an error flow. Using this executor avoids the
+ problem where the shutdown will wait for the current thread to terminate, time out,
+ and then force-close other components. When a dedicated executor is supplied,
+ the method will not wait on its threads to complete, and will terminate normally.
+ It is recommended that the executor used here is dedicated for this purpose and
+ not used elsewhere.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:complexType name="adapterType">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines inbound operation invoking type
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexContent>
+ <xsd:extension base="mbeanServerIdentifyerType">
+ <xsd:attribute name="channel" type="xsd:string" />
+ <xsd:attribute name="object-name" type="xsd:string" use="required" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+
+ <xsd:complexType name="operationInvokingType">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines outbound operation invoking type
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexContent>
+ <xsd:extension base="mbeanServerIdentifyerType">
+ <xsd:attribute name="object-name" type="xsd:string" use="required" />
+ <xsd:attribute name="operation-name" type="xsd:string" use="required" />
+ </xsd:extension>
+ </xsd:complexContent>
+ </xsd:complexType>
+
+ <xsd:complexType name="mbeanServerIdentifyerType">
+ <xsd:attribute name="id" type="xsd:string" use="optional" />
+ <xsd:attribute name="server" type="xsd:string" default="mbeanServer">
+ <xsd:annotation>
+ <xsd:documentation>
+ Defines the name of the MBeanServer bean to connect to.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:complexType>
+
+</xsd:schema>
View
81 .../src/test/java/org/springframework/integration/monitor/MBeanExporterIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 the original author or authors.
+ * Copyright 2009-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -13,8 +13,10 @@
package org.springframework.integration.monitor;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Date;
@@ -28,10 +30,14 @@
import org.junit.Test;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.Lifecycle;
import org.springframework.context.support.GenericXmlApplicationContext;
+import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
+import org.springframework.integration.core.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.AbstractEndpoint;
+import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.util.Assert;
@@ -107,6 +113,28 @@ public void testLifecycleInEndpointWithMessageSource() throws Exception {
}
// Lifecycle method name
assertEquals("start", startName);
+ assertTrue((Boolean) server.invoke(names.iterator().next(), "isRunning", null, null));
+ messageChannelsMonitor.stopActiveComponents(false, 3000);
+ assertFalse((Boolean) server.invoke(names.iterator().next(), "isRunning", null, null));
+ ActiveChannel activeChannel = context.getBean("activeChannel", ActiveChannel.class);
+ assertTrue(activeChannel.isStopCalled());
+ OtherActiveComponent otherActiveComponent = context.getBean(OtherActiveComponent.class);
+ assertTrue(otherActiveComponent.isStopCalled());
+ }
+
+ @Test
+ public void testSelfDestruction() throws Exception {
+ context = new GenericXmlApplicationContext(getClass(), "self-destruction-context.xml");
+ SourcePollingChannelAdapter adapter = context.getBean(SourcePollingChannelAdapter.class);
+ adapter.start();
+ int n = 0;
+ while (adapter.isRunning()) {
+ n += 10;
+ if (n > 10000) {
+ fail("Adapter failed to stop");
+ }
+ Thread.sleep(10);
+ }
}
@Test
@@ -276,4 +304,55 @@ public int getCounter() {
}
}
+ public static interface ActiveChannel {
+ boolean isStopCalled();
+ }
+
+ public static class ActiveChannelImpl implements MessageChannel, Lifecycle, ActiveChannel {
+
+ private boolean stopCalled;
+
+ public boolean send(Message<?> message) {
+ return false;
+ }
+
+ public boolean send(Message<?> message, long timeout) {
+ return false;
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+ this.stopCalled = true;
+ }
+
+ public boolean isRunning() {
+ return false;
+ }
+
+ public boolean isStopCalled() {
+ return this.stopCalled;
+ }
+ }
+
+ public static class OtherActiveComponent implements OrderlyShutdownCapable, Lifecycle {
+
+ private boolean stopCalled;
+
+ public void start() {
+ }
+
+ public void stop() {
+ this.stopCalled = true;
+ }
+
+ public boolean isRunning() {
+ return false;
+ }
+
+ public boolean isStopCalled() {
+ return this.stopCalled;
+ }
+ }
}
View
22 ...ntegration-jmx/src/test/java/org/springframework/integration/monitor/lifecycle-source.xml
@@ -1,7 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration"
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:int="http://www.springframework.org/schema/integration"
+ xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
+ http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
@@ -17,4 +21,18 @@
<bean id="service" class="org.springframework.integration.monitor.MBeanExporterIntegrationTests$SimpleService" />
+ <bean id="activeChannel" class="org.springframework.integration.monitor.MBeanExporterIntegrationTests$ActiveChannelImpl" />
+
+ <task:scheduler id="someScheduler" />
+
+ <task:executor id="someExecutor" />
+
+ <bean id="nonSpringExecutor" class="java.util.concurrent.Executors" factory-method="newSingleThreadExecutor" />
+
+ <bean id="otherActiveComponent" class="org.springframework.integration.monitor.MBeanExporterIntegrationTests$OtherActiveComponent" />
+
+ <bean id="ignoreWrappedExecutor" class="org.springframework.core.task.support.ExecutorServiceAdapter">
+ <constructor-arg ref="someExecutor" />
+ </bean>
+
</beans>
View
40 ...on-jmx/src/test/java/org/springframework/integration/monitor/self-destruction-context.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx"
+ xmlns:int="http://www.springframework.org/schema/integration"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:task="http://www.springframework.org/schema/task"
+ xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
+ http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
+ http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx.xsd
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+ <int:inbound-channel-adapter channel="toControlBus" auto-startup="false"
+ expression="@integrationMbeanExporter.stopActiveComponents(false, 1000)" />
+
+ <int:channel id="toControlBus" />
+
+ <int:control-bus input-channel="toControlBus" />
+
+ <int-jmx:mbean-export id="integrationMbeanExporter" shutdown-executor="shutdownExec"
+ default-domain="self-destruct" />
+
+ <context:mbean-server />
+
+ <int:poller default="true" fixed-delay="500" />
+
+ <task:executor id="shutdownExec" />
+
+ <bean id="activeChannel" class="org.springframework.integration.monitor.MBeanExporterIntegrationTests$ActiveChannelImpl" />
+
+ <task:scheduler id="someScheduler" />
+
+ <task:executor id="someExecutor" />
+
+ <bean id="nonSpringExecutor" class="java.util.concurrent.Executors" factory-method="newSingleThreadExecutor" />
+
+ <bean id="otherActiveComponent" class="org.springframework.integration.monitor.MBeanExporterIntegrationTests$OtherActiveComponent" />
+
+</beans>
Something went wrong with that request. Please try again.