diff --git a/spring-amqp/src/main/java/org/springframework/amqp/event/AmqpEvent.java b/spring-amqp/src/main/java/org/springframework/amqp/event/AmqpEvent.java index 17d4a3c8c3..8ab69bd5a8 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/event/AmqpEvent.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/event/AmqpEvent.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ * Base class for events. * * @author Gary Russell - * @since 1,5 + * @since 1.5 * */ @SuppressWarnings("serial") diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java index d66f1bee12..5ad6c6d4cf 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 the original author or authors. + * Copyright 2014-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -140,6 +140,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) { if (this.phase != null) { instance.setPhase(this.phase); } + instance.setListenerId(endpoint.getId()); endpoint.setupListenerContainer(instance); initializeContainer(instance); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java index 556be5770f..d548ff612d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,8 @@ public class RabbitNamespaceUtils { private static final String CONSUMER_TAG_STRATEGY = "consumer-tag-strategy"; + private static final String IDLE_EVENT_INTERVAL = "idle-event-interval"; + public static BeanDefinition parseContainer(Element containerEle, ParserContext parserContext) { RootBeanDefinition containerDef = new RootBeanDefinition(SimpleMessageListenerContainer.class); containerDef.setSource(parserContext.extractSource(containerEle)); @@ -253,6 +255,11 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext new RuntimeBeanReference(consumerTagStrategy)); } + String idleEventInterval = containerEle.getAttribute(IDLE_EVENT_INTERVAL); + if (StringUtils.hasText(idleEventInterval)) { + containerDef.getPropertyValues().add("idleEventInterval", new TypedStringValue(idleEventInterval)); + } + return containerDef; } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index fc6fdc833b..bb20c63820 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2015 the original author or authors. + * Copyright 2014-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,8 @@ import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.ConsumerTagStrategy; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; @@ -40,7 +42,8 @@ * @since 1.4 */ public class SimpleRabbitListenerContainerFactory - extends AbstractRabbitListenerContainerFactory { + extends AbstractRabbitListenerContainerFactory + implements ApplicationEventPublisherAware { private Executor taskExecutor; @@ -74,6 +77,10 @@ public class SimpleRabbitListenerContainerFactory private ConsumerTagStrategy consumerTagStrategy; + private Long idleEventInterval; + + private ApplicationEventPublisher applicationEventPublisher; + /** * @param taskExecutor the {@link Executor} to use. * @see SimpleMessageListenerContainer#setTaskExecutor @@ -211,6 +218,18 @@ public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy) { this.consumerTagStrategy = consumerTagStrategy; } + /** + * How often to publish idle container events. + * @param idleEventInterval the interval. + */ + public void setIdleEventInterval(Long idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + @Override protected SimpleMessageListenerContainer createContainerInstance() { return new SimpleMessageListenerContainer(); @@ -268,6 +287,12 @@ protected void initializeContainer(SimpleMessageListenerContainer instance) { if (this.consumerTagStrategy != null) { instance.setConsumerTagStrategy(this.consumerTagStrategy); } + if (this.idleEventInterval != null) { + instance.setIdleEventInterval(this.idleEventInterval); + } + if (this.applicationEventPublisher != null) { + instance.setApplicationEventPublisher(this.applicationEventPublisher); + } } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 06754cf590..cf0d0242dc 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,6 +99,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor private volatile ApplicationContext applicationContext; + private String listenerId; + /** *

* Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is @@ -401,6 +403,18 @@ public ConnectionFactory getConnectionFactory() { return connectionFactory; } + /** + * The 'id' attribute of the listener. + * @return the id (or the container bean name if no id set). + */ + public String getListenerId() { + return this.listenerId != null ? this.listenerId : this.beanName; + } + + public void setListenerId(String listenerId) { + this.listenerId = listenerId; + } + /** * Delegates to {@link #validateConfiguration()} and {@link #initialize()}. */ diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerContainerIdleEvent.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerContainerIdleEvent.java new file mode 100644 index 0000000000..e9d4164b76 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ListenerContainerIdleEvent.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.amqp.rabbit.listener; + +import org.springframework.amqp.event.AmqpEvent; + +/** + * An event that is emitted when a container is idle if the container + * is configured to do so. + * + * @author Gary Russell + * @since 1.6 + * + */ +@SuppressWarnings("serial") +public class ListenerContainerIdleEvent extends AmqpEvent { + + private final long idleTime; + + public ListenerContainerIdleEvent(SimpleMessageListenerContainer source, long idleTime) { + super(source); + this.idleTime = idleTime; + } + + /** + * How long the container has been idle. + * @return the time in milliseconds. + */ + public long getIdleTime() { + return idleTime; + } + + /** + * The queues the container is listening to. + * @return the queue names. + */ + public String[] getQueues() { + return ((SimpleMessageListenerContainer) getSource()).getQueueNames(); + } + + /** + * The id of the listener (if {@code @RabbitListener}) or the container bean name. + * @return the id. + */ + public String getListenerId() { + return ((SimpleMessageListenerContainer) getSource()).getListenerId(); + } + + @Override + public String toString() { + return "ListenerContainerIdleEvent [idleTime=" + + ((float) this.idleTime / 1000) + "s, listenerId=" + getListenerId() + + ", container=" + getSource() + "]"; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index c02e85e813..c58624547f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -27,6 +27,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.aopalliance.aop.Advice; import org.apache.commons.logging.Log; @@ -112,6 +113,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta */ public static final long DEFAULT_RECOVERY_INTERVAL = 5000; + private final AtomicLong lastNoMessageAlert = new AtomicLong(); + private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT; private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL; @@ -194,6 +197,10 @@ public void invokeListener(Channel channel, Message message) throws Exception { private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger(); + private Long idleEventInterval; + + private volatile long lastReceive = System.currentTimeMillis(); + /** * Default constructor for convenient dependency injection via setters. */ @@ -643,6 +650,14 @@ public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclu this.exclusiveConsumerExceptionLogger = exclusiveConsumerExceptionLogger; } + /** + * How often to emit {@link ListenerContainerIdleEvent}s in milliseconds. + * @param idleEventInterval the interval. + */ + public void setIdleEventInterval(long idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + /** * Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent * consumers. @@ -1219,6 +1234,21 @@ public void run() { } } } + if (idleEventInterval != null) { + if (receivedOk) { + lastReceive = System.currentTimeMillis(); + } + else { + long now = System.currentTimeMillis(); + long lastAlertAt = lastNoMessageAlert.get(); + long lastReceive = SimpleMessageListenerContainer.this.lastReceive; + if (now > lastReceive + idleEventInterval + && now > lastAlertAt + idleEventInterval + && lastNoMessageAlert.compareAndSet(lastAlertAt, now)) { + publishIdleContainerEvent(now - lastReceive); + } + } + } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw @@ -1240,7 +1270,7 @@ public void run() { logger.debug("Consumer thread interrupted, processing stopped."); Thread.currentThread().interrupt(); aborted = true; - publishEvent("Consumer thread interrupted, processing stopped", true, e); + publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e); } catch (QueuesNotAvailableException ex) { if (SimpleMessageListenerContainer.this.missingQueuesFatal) { @@ -1249,20 +1279,20 @@ public void run() { // Fatal, but no point re-throwing, so just abort. aborted = true; } - publishEvent("Consumer queue(s) not available", aborted, ex); + publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex); } catch (FatalListenerStartupException ex) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; - publishEvent("Consumer received fatal exception on startup", true, ex); + publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex); } catch (FatalListenerExecutionException ex) { logger.error("Consumer received fatal exception during processing", ex); // Fatal, but no point re-throwing, so just abort. aborted = true; - publishEvent("Consumer received fatal exception during processing", true, ex); + publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex); } catch (ShutdownSignalException e) { if (RabbitUtils.isNormalShutdown(e)) { @@ -1278,7 +1308,7 @@ public void run() { if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException && e.getCause().getCause().getMessage().contains("in exclusive use")) { exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause()); - publishEvent("Consumer raised exception, attempting restart", false, e); + publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e); } else { this.logConsumerException(e); @@ -1341,16 +1371,23 @@ private void logConsumerException(Throwable t) { logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. " + "Exception summary: " + t); } - publishEvent("Consumer raised exception, attempting restart", false, t); + publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t); } - private void publishEvent(String reason, boolean fatal, Throwable t) { + private void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) { if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent( SimpleMessageListenerContainer.this, reason, t, fatal)); } } + private void publishIdleContainerEvent(long idleTime) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new ListenerContainerIdleEvent(SimpleMessageListenerContainer.this, idleTime)); + } + } + } @Override diff --git a/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.6.xsd b/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.6.xsd index 29ea1784d9..ea0af3a02f 100644 --- a/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.6.xsd +++ b/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.6.xsd @@ -797,6 +797,16 @@ + + + + + diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIdleContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIdleContainerTests.java new file mode 100644 index 0000000000..dbd9531313 --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIdleContainerTests.java @@ -0,0 +1,158 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.amqp.rabbit.annotation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.amqp.core.AnonymousQueue; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.ListenerContainerIdleEvent; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Gary Russell + * @since 1.6 + * + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext +public class EnableRabbitIdleContainerTests { + + @Autowired + private Listener listener; + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private Queue queue; + + @Test + public void testIdle() throws Exception { + assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue.getName(), "foo")); + assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue.getName(), "foo")); + assertTrue(this.listener.latch.await(10, TimeUnit.SECONDS)); + assertEquals("foo", this.listener.event.getListenerId()); + assertEquals(this.queue.getName(), this.listener.event.getQueues()[0]); + assertEquals("BAR", this.rabbitTemplate.convertSendAndReceive(this.queue.getName(), "bar")); + assertEquals("BAR", this.rabbitTemplate.convertSendAndReceive(this.queue.getName(), "bar")); + assertFalse(this.listener.barEventReceived); + } + + @Configuration + @EnableRabbit + public static class EnableRabbitConfig { + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(rabbitConnectionFactory()); + factory.setIdleEventInterval(500L); + factory.setReceiveTimeout(100L); + return factory; + } + + @Bean + public ConnectionFactory rabbitConnectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost("localhost"); + return connectionFactory; + } + + @Bean + public RabbitTemplate rabbitTemplate() { + return new RabbitTemplate(rabbitConnectionFactory()); + } + + @Bean + public RabbitAdmin rabbitAdmin() { + return new RabbitAdmin(rabbitConnectionFactory()); + } + + @Bean + public Queue queue() { + return new AnonymousQueue(); + } + + @Bean + public Listener listener() { + return new Listener(); + } + + } + + public static class Listener { + + private final Log logger = LogFactory.getLog(this.getClass()); + + private final CountDownLatch latch = new CountDownLatch(2); + + private volatile ListenerContainerIdleEvent event; + + private boolean barEventReceived; + + @RabbitListener(id="foo", queues="#{queue.name}") + public String listenFoo(String foo) { + logger.info("foo: " + foo); + return foo.toUpperCase(); + } + + @EventListener(condition = "event.listenerId == 'foo'") + public void onApplicationEvent(ListenerContainerIdleEvent event) { + if (!"foo".equals(event.getListenerId())) { + this.barEventReceived = true; + } + logger.info("foo: " + event); + this.event = event; + this.latch.countDown(); + } + + @RabbitListener(id="bar", queues="#{queue.name}") + public String listenBar(String bar) { + logger.info("bar: " + bar); + return bar.toUpperCase(); + } + + @EventListener(condition = "event.listenerId == 'bar'") + public void onApplicationEventBar(ListenerContainerIdleEvent event) { + logger.info("bar: " + event); + } + + } + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java index c2666457bd..c5fe46c7be 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2015 the original author or authors. + * Copyright 2010-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,6 @@ public class ListenerContainerParserTests { @Before public void setUp() throws Exception { - ListenerContainerParser parser = new ListenerContainerParser(); beanFactory = new DefaultListableBeanFactory(); XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory); reader.loadBeanDefinitions(new ClassPathResource(getClass().getSimpleName() + "-context.xml", getClass())); @@ -103,6 +102,8 @@ public void testParseWithQueueNames() throws Exception { assertEquals(3, group.size()); assertThat(group, Matchers.contains(beanFactory.getBean("container1"), beanFactory.getBean("testListener1"), beanFactory.getBean("testListener2"))); + assertEquals(1235L, ReflectionTestUtils.getField(container, "idleEventInterval")); + assertEquals("container1", container.getListenerId()); } @Test diff --git a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml index 3854b8ede0..da6b39d1de 100644 --- a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml +++ b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml @@ -12,7 +12,7 @@ >. +[[idle-containers]] +===== Detecting Idle Asynchronous Consumers + +While efficient, one problem with asynchronous consumers is detecting when they are idle - users might want to take +some action if no messages arrive for some period of time. + +Starting with _version 1.6_, it is now possible to configure the listener container to publish a +`ListenerContainerIdleEvent` when some time passes with no message delivery. +While the container is idle, an event will be published every `idleEventInterval` milliseconds. + +To configure this feature, set the `idleEventInterval` on the container: + +====== xml + +[source, xml] +---- + + + +---- + +====== Java + +[source, java] +---- +@Bean +public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + ... + container.setIdleEventInterval(60000L); + ... + return container; +} +---- + +====== @RabbitListener + +[source, java] +---- +@Bean +public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(rabbitConnectionFactory()); + factory.setIdleEventInterval(60000L); + ... + return factory; +} +---- + +In each of these cases, an event will be published once per minute while the container is idle. + +====== Event Consumption + +You can capture these events by implementing `ApplicationListener` - either a general listener, or one narrowed to only +receive this specific event. + +The following example combines the `@RabbitListener` and `ApplicationListener` into a single class. +It's important to understand that the application listener will get events for all containers so you may need to +check the listener id if you want to take specific action based on which container is idle. + +The events have 4 properties: + +- source - the listener container instance +- id - the listener id (or container bean name) +- idleTime - the time the container had been idle when the event was published +- queueNames - the names of the queue(s) that the container listens to + +[source, xml] +---- +public class Listener { + + private final CountDownLatch latch = new CountDownLatch(2); + + private volatile ListenerContainerIdleEvent event; + + @RabbitListener(id="foo", queues="#{queue.name}") + public String listen(String foo) { + return foo.toUpperCase(); + } + + @EventListener(condition = "event.listenerId == 'foo'") + public void onApplicationEvent(ListenerContainerIdleEvent event) { + ... + } + +} +---- + +IMPORTANT: Event listeners will see events for all containers; so, in the example above, we narrow the events received +based on the listener ID. + [[message-converters]] ==== Message Converters @@ -2985,6 +3080,12 @@ The namespace attribute is available in _version 1.5.x_ Previously, only broker-generated consumer tags can be used; while this is still the default, you can now provide an implementation of <>, enabling the creation of a (unique) tag for each consumer. +| idleEventInterval +(idle-event-integer) + +| Starting with _version 1.6_, `SimpleMessageListenerContainer` has this new property. +See <>. + |=== [[listener-concurrency]] diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index a19ea6fcd1..bcb78348da 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -1,6 +1,14 @@ [[whats-new]] === What's New +==== Changes in 1.6 Since 1.5 + +===== Idle Message Listener Detection + +It is now possible to configure listener containers to publish `ApplicationEvent` s when idle. + +See <> for more informatiom. + ==== Changes in 1.5 Since 1.4 ===== spring-erlang is No Longer Supported