From 37b9612f24322a699a9250ed34fbe1ae8eac88ba Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 15 Dec 2016 18:58:19 -0500 Subject: [PATCH] INT-4181: Refactor IMBE to use IntMngmtConfigurer JIRA: https://jira.spring.io/browse/INT-4181 * To avoid duplicate code move actual `Metrics` resolution to the `IntegrationManagementConfigurer` and delegate from the `IntegrationMBeanExporter` for backward compatibility * Minor refactoring and improvements in the `IntegrationMBeanExporter` * Fix typos in the `jms/AsyncGatewayTests` **Cherry-pick to 4.3.x** --- .../IntegrationManagementConfigurer.java | 59 +++++- .../jms/request_reply/AsyncGatewayTests.java | 6 +- .../monitor/IntegrationMBeanExporter.java | 169 ++++++++---------- 3 files changed, 125 insertions(+), 109 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagementConfigurer.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagementConfigurer.java index c60ab6b1afb..be14166f797 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagementConfigurer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagementConfigurer.java @@ -17,9 +17,13 @@ package org.springframework.integration.support.management; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -35,14 +39,23 @@ * Configures counts, stats, logging for all (or selected) components. * * @author Gary Russell + * @author Artem Bilan * @since 4.2 * */ public class IntegrationManagementConfigurer implements SmartInitializingSingleton, ApplicationContextAware, BeanNameAware { + private static final Log logger = LogFactory.getLog(IntegrationManagementConfigurer.class); + public static final String MANAGEMENT_CONFIGURER_NAME = "integrationManagementConfigurer"; + private final Map channelsByName = new HashMap(); + + private final Map handlersByName = new HashMap(); + + private final Map sourcesByName = new HashMap(); + private ApplicationContext applicationContext; private String beanName; @@ -61,7 +74,6 @@ public class IntegrationManagementConfigurer implements SmartInitializingSinglet private String[] enabledStatsPatterns = { }; - @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; @@ -230,6 +242,7 @@ private void configureChannelMetrics(String name, MessageChannelMetrics bean) { if (bean instanceof ConfigurableMetricsAware) { ((ConfigurableMetricsAware) bean).configureMetrics(metrics); } + this.channelsByName.put(name, bean); } @SuppressWarnings("unchecked") @@ -255,6 +268,8 @@ private void configureHandlerMetrics(String name, MessageHandlerMetrics bean) { if (bean instanceof ConfigurableMetricsAware) { ((ConfigurableMetricsAware) bean).configureMetrics(metrics); } + + this.handlersByName.put(bean.getManagedName() != null ? bean.getManagedName() : name, bean); } private void configureSourceMetrics(String name, MessageSourceMetrics bean) { @@ -265,6 +280,7 @@ private void configureSourceMetrics(String name, MessageSourceMetrics bean) { else { bean.setCountsEnabled(this.defaultCountsEnabled); } + this.sourcesByName.put(bean.getManagedName() != null ? bean.getManagedName() : name, bean); } /** @@ -294,23 +310,52 @@ else if (pattern.startsWith("\\")) { return null; //NOSONAR - intentional null return } + public String[] getChannelNames() { + return this.channelsByName.keySet().toArray(new String[this.channelsByName.size()]); + } + + public String[] getHandlerNames() { + return this.handlersByName.keySet().toArray(new String[this.handlersByName.size()]); + } + + public String[] getSourceNames() { + return this.sourcesByName.keySet().toArray(new String[this.sourcesByName.size()]); + } + public MessageChannelMetrics getChannelMetrics(String name) { - if (this.applicationContext.containsBean(name)) { - return this.applicationContext.getBean(name, MessageChannelMetrics.class); + if (this.channelsByName.containsKey(name)) { + return this.channelsByName.get(name); + } + if (logger.isDebugEnabled()) { + logger.debug("No channel found for (" + name + ")"); } return null; } public MessageHandlerMetrics getHandlerMetrics(String name) { - if (this.applicationContext.containsBean(name)) { - return this.applicationContext.getBean(name, MessageHandlerMetrics.class); + if (this.handlersByName.containsKey(name)) { + return this.handlersByName.get(name); + } + if (this.handlersByName.containsKey(name + ".handler")) { + return this.handlersByName.get(name + ".handler"); + } + + if (logger.isDebugEnabled()) { + logger.debug("No handler found for (" + name + ")"); } return null; } public MessageSourceMetrics getSourceMetrics(String name) { - if (this.applicationContext.containsBean(name)) { - return this.applicationContext.getBean(name, MessageSourceMetrics.class); + if (this.sourcesByName.containsKey(name)) { + return this.sourcesByName.get(name); + } + if (this.sourcesByName.containsKey(name + ".source")) { + return this.sourcesByName.get(name + ".source"); + } + + if (logger.isDebugEnabled()) { + logger.debug("No source found for (" + name + ")"); } return null; } diff --git a/spring-integration-jms/src/test/java/org/springframework/integration/jms/request_reply/AsyncGatewayTests.java b/spring-integration-jms/src/test/java/org/springframework/integration/jms/request_reply/AsyncGatewayTests.java index 5114eb6fd05..cc0acf20418 100644 --- a/spring-integration-jms/src/test/java/org/springframework/integration/jms/request_reply/AsyncGatewayTests.java +++ b/spring-integration-jms/src/test/java/org/springframework/integration/jms/request_reply/AsyncGatewayTests.java @@ -105,7 +105,7 @@ public void testWithTimeout() throws Exception { assertThat(error.getPayload(), instanceOf(MessagingException.class)); assertThat(((MessagingException) error.getPayload()).getCause(), instanceOf(JmsTimeoutException.class)); assertEquals("foo", ((MessagingException) error.getPayload()).getFailedMessage().getPayload()); - this.gateway1.stop(); + this.gateway2.stop(); } @Test @@ -121,7 +121,7 @@ public void testWithTimeoutNoReplyRequired() throws Exception { assertNotNull(received); org.springframework.messaging.Message error = errors.receive(1000); assertNull(error); - this.gateway1.stop(); + this.gateway2.stop(); } @Configuration @@ -131,7 +131,7 @@ public static class Config { @Bean public CachingConnectionFactory ccf() { return new CachingConnectionFactory( - new ActiveMQConnectionFactory("vm://localhosti?broker.persistent=false")); + new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false")); } @Bean diff --git a/spring-integration-jmx/src/main/java/org/springframework/integration/monitor/IntegrationMBeanExporter.java b/spring-integration-jmx/src/main/java/org/springframework/integration/monitor/IntegrationMBeanExporter.java index 0b82cd2c855..6ffde3c31c0 100644 --- a/spring-integration-jmx/src/main/java/org/springframework/integration/monitor/IntegrationMBeanExporter.java +++ b/spring-integration-jmx/src/main/java/org/springframework/integration/monitor/IntegrationMBeanExporter.java @@ -74,7 +74,6 @@ import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedMetric; import org.springframework.jmx.export.annotation.ManagedOperation; -import org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler; import org.springframework.jmx.export.naming.MetadataNamingStrategy; import org.springframework.jmx.support.MetricType; import org.springframework.messaging.MessageChannel; @@ -135,12 +134,6 @@ public class IntegrationMBeanExporter extends MBeanExporter implements Applicati private final Set channels = new HashSet(); - private final Map channelsByName = new HashMap(); - - private final Map handlersByName = new HashMap(); - - private final Map sourcesByName = new HashMap(); - private final Map allChannelsByName = new HashMap(); private final Map allHandlersByName = new HashMap(); @@ -153,12 +146,12 @@ public class IntegrationMBeanExporter extends MBeanExporter implements Applicati private final Properties objectNameStaticProperties = new Properties(); - private final MetadataMBeanInfoAssembler assembler = new IntegrationMetadataMBeanInfoAssembler(this.attributeSource); - private final MetadataNamingStrategy defaultNamingStrategy = new IntegrationMetadataNamingStrategy(this.attributeSource); private String[] componentNamePatterns = { "*" }; + private IntegrationManagementConfigurer managementConfigurer; + private volatile long shutdownDeadline; private final AtomicBoolean shuttingDown = new AtomicBoolean(); @@ -169,7 +162,7 @@ public IntegrationMBeanExporter() { // Shouldn't be necessary, but to be on the safe side... setAutodetect(false); setNamingStrategy(this.defaultNamingStrategy); - setAssembler(this.assembler); + setAssembler(new IntegrationMetadataMBeanInfoAssembler(this.attributeSource)); } /** @@ -284,12 +277,17 @@ public void afterSingletonsInstantiated() { } } if (!this.applicationContext.containsBean(IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME)) { - IntegrationManagementConfigurer config = new IntegrationManagementConfigurer(); - config.setDefaultCountsEnabled(true); - config.setDefaultStatsEnabled(true); - config.setApplicationContext(this.applicationContext); - config.setBeanName(IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME); - config.afterSingletonsInstantiated(); + this.managementConfigurer = new IntegrationManagementConfigurer(); + this.managementConfigurer.setDefaultCountsEnabled(true); + this.managementConfigurer.setDefaultStatsEnabled(true); + this.managementConfigurer.setApplicationContext(this.applicationContext); + this.managementConfigurer.setBeanName(IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME); + this.managementConfigurer.afterSingletonsInstantiated(); + } + else { + this.managementConfigurer = + this.applicationContext.getBean(IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME, + IntegrationManagementConfigurer.class); } } catch (RuntimeException e) { @@ -364,15 +362,15 @@ private ObjectName registerBeanInstance(Object bean, String beanKey) { @Override public void destroy() { super.destroy(); - this.channelsByName.clear(); - this.handlersByName.clear(); - this.sourcesByName.clear(); for (MessageChannelMetrics monitor : this.channels) { logger.info("Summary on shutdown: " + monitor); } for (MessageHandlerMetrics monitor : this.handlers) { logger.info("Summary on shutdown: " + monitor); } + for (MessageSourceMetrics monitor : this.sources) { + logger.info("Summary on shutdown: " + monitor); + } } /** @@ -505,19 +503,24 @@ protected final void orderlyShutdownCapableComponentsAfter() { logger.debug("Finalized stop OrderlyShutdownCapable components"); } - @ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageChannel Channel Count") + @ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageChannel Count") public int getChannelCount() { - return this.channelsByName.size(); + return this.managementConfigurer.getChannelNames().length; } - @ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageHandler Handler Count") + @ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageHandler Count") public int getHandlerCount() { - return this.handlersByName.size(); + return this.managementConfigurer.getHandlerNames().length; + } + + @ManagedMetric(metricType = MetricType.COUNTER, displayName = "MessageSource Count") + public int getSourceCount() { + return this.managementConfigurer.getSourceNames().length; } @ManagedAttribute public String[] getHandlerNames() { - return this.handlersByName.keySet().toArray(new String[this.handlersByName.size()]); + return this.managementConfigurer.getHandlerNames(); } @ManagedMetric(metricType = MetricType.GAUGE, displayName = "Active Handler Count") @@ -547,31 +550,25 @@ public int getQueuedMessageCount() { @ManagedAttribute public String[] getChannelNames() { - return this.channelsByName.keySet().toArray(new String[this.channelsByName.size()]); + return this.managementConfigurer.getChannelNames(); } public MessageHandlerMetrics getHandlerMetrics(String name) { - if (this.handlersByName.containsKey(name)) { - return this.handlersByName.get(name); - } - logger.debug("No handler found for (" + name + ")"); - return null; + return this.managementConfigurer.getHandlerMetrics(name); } public Statistics getHandlerDuration(String name) { - if (this.handlersByName.containsKey(name)) { - return this.handlersByName.get(name).getDuration(); - } - logger.debug("No handler found for (" + name + ")"); - return null; + MessageHandlerMetrics handlerMetrics = getHandlerMetrics(name); + return handlerMetrics != null ? handlerMetrics.getDuration() : null; + } + + @ManagedAttribute + public String[] getSourceNames() { + return this.managementConfigurer.getSourceNames(); } public MessageSourceMetrics getSourceMetrics(String name) { - if (this.sourcesByName.containsKey(name)) { - return this.sourcesByName.get(name); - } - logger.debug("No source found for (" + name + ")"); - return null; + return this.managementConfigurer.getSourceMetrics(name); } public int getSourceMessageCount(String name) { @@ -579,19 +576,12 @@ public int getSourceMessageCount(String name) { } public long getSourceMessageCountLong(String name) { - if (this.sourcesByName.containsKey(name)) { - return this.sourcesByName.get(name).getMessageCountLong(); - } - logger.debug("No source found for (" + name + ")"); - return -1; + MessageSourceMetrics sourceMetrics = getSourceMetrics(name); + return sourceMetrics != null ? sourceMetrics.getMessageCountLong() : -1; } public MessageChannelMetrics getChannelMetrics(String name) { - if (this.channelsByName.containsKey(name)) { - return this.channelsByName.get(name); - } - logger.debug("No channel found for (" + name + ")"); - return null; + return this.managementConfigurer.getChannelMetrics(name); } public int getChannelSendCount(String name) { @@ -599,11 +589,8 @@ public int getChannelSendCount(String name) { } public long getChannelSendCountLong(String name) { - if (this.channelsByName.containsKey(name)) { - return this.channelsByName.get(name).getSendCountLong(); - } - logger.debug("No channel found for (" + name + ")"); - return -1; + MessageChannelMetrics channelMetrics = getChannelMetrics(name); + return channelMetrics != null ? channelMetrics.getSendCountLong() : -1; } public int getChannelSendErrorCount(String name) { @@ -611,11 +598,8 @@ public int getChannelSendErrorCount(String name) { } public long getChannelSendErrorCountLong(String name) { - if (this.channelsByName.containsKey(name)) { - return this.channelsByName.get(name).getSendErrorCountLong(); - } - logger.debug("No channel found for (" + name + ")"); - return -1; + MessageChannelMetrics channelMetrics = getChannelMetrics(name); + return channelMetrics != null ? channelMetrics.getSendErrorCountLong() : -1; } public int getChannelReceiveCount(String name) { @@ -623,30 +607,22 @@ public int getChannelReceiveCount(String name) { } public long getChannelReceiveCountLong(String name) { - if (this.channelsByName.containsKey(name)) { - if (this.channelsByName.get(name) instanceof PollableChannelManagement) { - return ((PollableChannelManagement) this.channelsByName.get(name)).getReceiveCountLong(); - } + MessageChannelMetrics channelMetrics = getChannelMetrics(name); + if (channelMetrics instanceof PollableChannelManagement) { + return ((PollableChannelManagement) channelMetrics).getReceiveCountLong(); } - logger.debug("No channel found for (" + name + ")"); return -1; } @ManagedOperation public Statistics getChannelSendRate(String name) { - if (this.channelsByName.containsKey(name)) { - return this.channelsByName.get(name).getSendRate(); - } - logger.debug("No channel found for (" + name + ")"); - return null; + MessageChannelMetrics channelMetrics = getChannelMetrics(name); + return channelMetrics != null ? channelMetrics.getSendRate() : null; } public Statistics getChannelErrorRate(String name) { - if (this.channelsByName.containsKey(name)) { - return this.channelsByName.get(name).getErrorRate(); - } - logger.debug("No channel found for (" + name + ")"); - return null; + MessageChannelMetrics channelMetrics = getChannelMetrics(name); + return channelMetrics != null ? channelMetrics.getErrorRate() : null; } private void registerChannels() { @@ -656,15 +632,12 @@ private void registerChannels() { if (!matches(this.componentNamePatterns, name)) { continue; } - // Only register once... - if (!this.channelsByName.containsKey(name)) { - String beanKey = getChannelBeanKey(name); + + String beanKey = getChannelBeanKey(name); + if (logger.isInfoEnabled()) { logger.info("Registering MessageChannel " + name); - if (name != null) { - this.channelsByName.put(name, monitor); - } - registerBeanNameOrInstance(monitor, beanKey); } + registerBeanNameOrInstance(monitor, beanKey); } } @@ -676,14 +649,12 @@ private void registerHandlers() { if (!matches(this.componentNamePatterns, name)) { continue; } - // Only register once... - if (!this.handlersByName.containsKey(name)) { - String beanKey = getHandlerBeanKey(monitor); - if (name != null) { - this.handlersByName.put(name, monitor); - } - registerBeanNameOrInstance(monitor, beanKey); + + String beanKey = getHandlerBeanKey(monitor); + if (logger.isInfoEnabled()) { + logger.info("Registering MessageHandler " + name); } + registerBeanNameOrInstance(monitor, beanKey); } } @@ -695,14 +666,12 @@ private void registerSources() { if (!matches(this.componentNamePatterns, name)) { continue; } - // Only register once... - if (!this.sourcesByName.containsKey(name)) { - String beanKey = getSourceBeanKey(monitor); - if (name != null) { - this.sourcesByName.put(name, monitor); - } - registerBeanNameOrInstance(monitor, beanKey); + + String beanKey = getSourceBeanKey(monitor); + if (logger.isInfoEnabled()) { + logger.info("Registering MessageSource " + name); } + registerBeanNameOrInstance(monitor, beanKey); } } @@ -737,7 +706,9 @@ private void registerEndpoints() { endpointNames.add(name); beanKey = getEndpointBeanKey(endpoint, name, source); ObjectName objectName = registerBeanInstance(new ManagedEndpoint(endpoint), beanKey); - logger.info("Registered endpoint without MessageSource: " + objectName); + if (logger.isInfoEnabled()) { + logger.info("Registered endpoint without MessageSource: " + objectName); + } } } } @@ -879,7 +850,7 @@ private MessageHandlerMetrics enhanceHandlerMonitor(MessageHandlerMetrics monito target = targetSource.getTarget(); } catch (Exception e) { - logger.debug("Could not get handler from bean = " + name); + logger.error("Could not get handler from bean = " + name); } } } @@ -1000,7 +971,7 @@ private MessageSourceMetrics enhanceSourceMonitor(MessageSourceMetrics monitor) target = targetSource.getTarget(); } catch (Exception e) { - logger.debug("Could not get handler from bean = " + name); + logger.error("Could not get handler from bean = " + name); } } }