Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@
* Base class for events.
*
* @author Gary Russell
* @since 1,5
* @since 1.5
*
*/
@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
if (this.phase != null) {
instance.setPhase(this.phase);
}
instance.setListenerId(endpoint.getId());

endpoint.setupListenerContainer(instance);
initializeContainer(instance);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,8 @@ public class RabbitNamespaceUtils {

private static final String CONSUMER_TAG_STRATEGY = "consumer-tag-strategy";

private static final String IDLE_EVENT_INTERVAL = "idle-event-interval";

public static BeanDefinition parseContainer(Element containerEle, ParserContext parserContext) {
RootBeanDefinition containerDef = new RootBeanDefinition(SimpleMessageListenerContainer.class);
containerDef.setSource(parserContext.extractSource(containerEle));
Expand Down Expand Up @@ -253,6 +255,11 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
new RuntimeBeanReference(consumerTagStrategy));
}

String idleEventInterval = containerEle.getAttribute(IDLE_EVENT_INTERVAL);
if (StringUtils.hasText(idleEventInterval)) {
containerDef.getPropertyValues().add("idleEventInterval", new TypedStringValue(idleEventInterval));
}

return containerDef;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 the original author or authors.
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
Expand All @@ -40,7 +42,8 @@
* @since 1.4
*/
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
implements ApplicationEventPublisherAware {

private Executor taskExecutor;

Expand Down Expand Up @@ -74,6 +77,10 @@ public class SimpleRabbitListenerContainerFactory

private ConsumerTagStrategy consumerTagStrategy;

private Long idleEventInterval;

private ApplicationEventPublisher applicationEventPublisher;

/**
* @param taskExecutor the {@link Executor} to use.
* @see SimpleMessageListenerContainer#setTaskExecutor
Expand Down Expand Up @@ -211,6 +218,18 @@ public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy) {
this.consumerTagStrategy = consumerTagStrategy;
}

/**
* How often to publish idle container events.
* @param idleEventInterval the interval.
*/
public void setIdleEventInterval(Long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
Expand Down Expand Up @@ -268,6 +287,12 @@ protected void initializeContainer(SimpleMessageListenerContainer instance) {
if (this.consumerTagStrategy != null) {
instance.setConsumerTagStrategy(this.consumerTagStrategy);
}
if (this.idleEventInterval != null) {
instance.setIdleEventInterval(this.idleEventInterval);
}
if (this.applicationEventPublisher != null) {
instance.setApplicationEventPublisher(this.applicationEventPublisher);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,6 +99,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private volatile ApplicationContext applicationContext;

private String listenerId;

/**
* <p>
* Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is
Expand Down Expand Up @@ -401,6 +403,18 @@ public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}

/**
* The 'id' attribute of the listener.
* @return the id (or the container bean name if no id set).
*/
public String getListenerId() {
return this.listenerId != null ? this.listenerId : this.beanName;
}

public void setListenerId(String listenerId) {
this.listenerId = listenerId;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;

import org.springframework.amqp.event.AmqpEvent;

/**
* An event that is emitted when a container is idle if the container
* is configured to do so.
*
* @author Gary Russell
* @since 1.6
*
*/
@SuppressWarnings("serial")
public class ListenerContainerIdleEvent extends AmqpEvent {

private final long idleTime;

public ListenerContainerIdleEvent(SimpleMessageListenerContainer source, long idleTime) {
super(source);
this.idleTime = idleTime;
}

/**
* How long the container has been idle.
* @return the time in milliseconds.
*/
public long getIdleTime() {
return idleTime;
}

/**
* The queues the container is listening to.
* @return the queue names.
*/
public String[] getQueues() {
return ((SimpleMessageListenerContainer) getSource()).getQueueNames();
}

/**
* The id of the listener (if {@code @RabbitListener}) or the container bean name.
* @return the id.
*/
public String getListenerId() {
return ((SimpleMessageListenerContainer) getSource()).getListenerId();
}

@Override
public String toString() {
return "ListenerContainerIdleEvent [idleTime="
+ ((float) this.idleTime / 1000) + "s, listenerId=" + getListenerId()
+ ", container=" + getSource() + "]";
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -112,6 +113,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;

private final AtomicLong lastNoMessageAlert = new AtomicLong();

private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;

private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
Expand Down Expand Up @@ -194,6 +197,10 @@ public void invokeListener(Channel channel, Message message) throws Exception {

private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();

private Long idleEventInterval;

private volatile long lastReceive = System.currentTimeMillis();

/**
* Default constructor for convenient dependency injection via setters.
*/
Expand Down Expand Up @@ -643,6 +650,14 @@ public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclu
this.exclusiveConsumerExceptionLogger = exclusiveConsumerExceptionLogger;
}

/**
* How often to emit {@link ListenerContainerIdleEvent}s in milliseconds.
* @param idleEventInterval the interval.
*/
public void setIdleEventInterval(long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

/**
* Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
* consumers.
Expand Down Expand Up @@ -1219,6 +1234,21 @@ public void run() {
}
}
}
if (idleEventInterval != null) {
if (receivedOk) {
lastReceive = System.currentTimeMillis();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = lastNoMessageAlert.get();
long lastReceive = SimpleMessageListenerContainer.this.lastReceive;
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
Expand All @@ -1240,7 +1270,7 @@ public void run() {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishEvent("Consumer thread interrupted, processing stopped", true, e);
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
Expand All @@ -1249,20 +1279,20 @@ public void run() {
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishEvent("Consumer queue(s) not available", aborted, ex);
publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception on startup", true, ex);
publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception during processing", true, ex);
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
Expand All @@ -1278,7 +1308,7 @@ public void run() {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause());
publishEvent("Consumer raised exception, attempting restart", false, e);
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else {
this.logConsumerException(e);
Expand Down Expand Up @@ -1341,16 +1371,23 @@ private void logConsumerException(Throwable t) {
logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
publishEvent("Consumer raised exception, attempting restart", false, t);
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
}

private void publishEvent(String reason, boolean fatal, Throwable t) {
private void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent(
SimpleMessageListenerContainer.this, reason, t, fatal));
}
}

private void publishIdleContainerEvent(long idleTime) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new ListenerContainerIdleEvent(SimpleMessageListenerContainer.this, idleTime));
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,16 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="idle-event-interval" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The time in milliseconds between application events generated when the container is idle (no
message received). A 'ListenerContainerIdleEvent' is published each time this interval elapses
until a message is again received. These events can be received by an 'ApplicationListener'.
Events are not published unless this is set.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

<xsd:complexType name="listenerType">
Expand Down
Loading