Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Partial sync of listener namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Fitzgerald committed Oct 3, 2012
1 parent c5493fa commit d78d30a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 83 deletions.
Expand Up @@ -23,6 +23,7 @@
using Spring.Messaging.Amqp.Rabbit.Core;
using Spring.Messaging.Amqp.Rabbit.Listener.Adapter;
using Spring.Objects.Factory;
using Spring.Transaction.Support;
using Spring.Util;
#endregion

Expand Down Expand Up @@ -279,7 +280,7 @@ public void Initialize()
/// </summary>
public void Shutdown()
{
this.logger.Debug("Shutting down Rabbit listener container");
this.logger.Debug(m => m("Shutting down Rabbit listener container"));
lock (this.lifecycleMonitor)
{
this.active = false;
Expand Down Expand Up @@ -350,11 +351,8 @@ public void Start()

try
{
if (this.logger.IsDebugEnabled)
{
this.logger.Debug("Starting Rabbit listener container.");
}

this.logger.Debug(m => m("Starting Rabbit listener container."));

this.DoStart();
}
catch (Exception ex)
Expand Down Expand Up @@ -439,13 +437,9 @@ protected void InvokeErrorHandler(Exception ex)
{
this.errorHandler.HandleError(ex);
}
else if (this.logger.IsDebugEnabled)
{
this.logger.Debug("Execution of Rabbit message listener failed, and no ErrorHandler has been set.", ex);
}
else if (this.logger.IsInfoEnabled)
else
{
this.logger.Info("Execution of Rabbit message listener failed, and no ErrorHandler has been set: " + ex.Source + ": " + ex.Message);
this.logger.Warn(m => m("Execution of Rabbit message listener failed, and no ErrorHandler has been set."), ex);
}
}

Expand All @@ -462,11 +456,8 @@ protected virtual void ExecuteListener(IModel channel, Message message)
{
if (!this.IsRunning)
{
if (this.logger.IsWarnEnabled)
{
this.logger.Warn("Rejecting received message because of the listener container " + "having been stopped in the meantime: " + message);
}

this.logger.Warn(m => m("Rejecting received message because the listener container has been stopped: {0}", message));

throw new MessageRejectedWhileStoppingException();
}

Expand Down Expand Up @@ -494,7 +485,24 @@ public virtual void InvokeListener(IModel channel, Message message)
}
else if (listener is IMessageListener)
{
this.DoInvokeListener((IMessageListener)listener, message);
var bindChannel = this.ExposeListenerChannel && this.IsChannelLocallyTransacted(channel);
if (bindChannel)
{
TransactionSynchronizationManager.BindResource(this.ConnectionFactory, new RabbitResourceHolder(channel, false));
}
try
{
this.DoInvokeListener((IMessageListener)listener, message);
}
finally
{
if (bindChannel)
{
// unbind if we bound
TransactionSynchronizationManager.UnbindResource(this.ConnectionFactory);
}
}

}
else if (listener != null)
{
Expand All @@ -517,14 +525,34 @@ protected virtual void DoInvokeListener(IChannelAwareMessageListener listener, I
{
RabbitResourceHolder resourceHolder = null;

var channelToUse = channel;
var boundHere = false;

try
{
var channelToUse = channel;
if (!this.ExposeListenerChannel)
{
// We need to expose a separate Channel.
resourceHolder = this.GetTransactionalResourceHolder();
channelToUse = resourceHolder.Channel;

if (this.IsChannelLocallyTransacted(channelToUse) &&
!TransactionSynchronizationManager.ActualTransactionActive)
{
TransactionSynchronizationManager.BindResource(
this.ConnectionFactory,
resourceHolder);
boundHere = true;
}
}
else
{
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (this.IsChannelLocallyTransacted(channel))
{
TransactionSynchronizationManager.BindResource(this.ConnectionFactory, new RabbitResourceHolder(channelToUse, false));
boundHere = true;
}
}

// Actually invoke the message listener
Expand All @@ -534,38 +562,30 @@ protected virtual void DoInvokeListener(IChannelAwareMessageListener listener, I
}
catch (Exception e)
{
this.ThrowOrWarnBasedOnRootCauseOfException(e);
throw this.WrapToListenerExecutionFailedExceptionIfNeeded(e);
}
}
finally
{
ConnectionFactoryUtils.ReleaseResources(resourceHolder);
if (boundHere)
{
// unbind if we bound
TransactionSynchronizationManager.UnbindResource(this.ConnectionFactory);
if (!ExposeListenerChannel && IsChannelLocallyTransacted(channelToUse))
{
/*
* commit the temporary channel we exposed; the consumer's channel
* will be committed later. Note that when exposing a different channel
* when there's no transaction manager, the exposed channel is committed
* on each message, and not based on txSize.
*/
RabbitUtils.CommitIfNecessary(channelToUse);
}
}
}
}

private void ThrowOrWarnBasedOnRootCauseOfException(Exception exception)
{
if (!this.IsFromHandleMethodResolutionFailure(exception))
{
throw this.WrapToListenerExecutionFailedExceptionIfNeeded(exception);
}

// TODO: probably need to shunt the message off to a Dead-Letter Queue b/c at this point its 100% underliverable
this.logger.Warn(string.Format("{0} is unable to resolve proper method to handle the message!", this.GetType()), exception);
}

private bool IsFromHandleMethodResolutionFailure(Exception exception)
{
bool flag = exception is ArgumentException;

if (exception.InnerException != null)
{
flag = this.IsFromHandleMethodResolutionFailure(exception.InnerException);
}

return flag;
}

/// <summary>Invoke the specified listener a Spring Rabbit MessageListener.</summary>
/// <remarks>Default implementation performs a plain invocation of the
/// <code>OnMessage</code>
Expand All @@ -580,7 +600,7 @@ protected virtual void DoInvokeListener(IMessageListener listener, Message messa
}
catch (Exception e)
{
this.ThrowOrWarnBasedOnRootCauseOfException(e);
this.WrapToListenerExecutionFailedExceptionIfNeeded(e);
}
}

Expand Down Expand Up @@ -612,7 +632,7 @@ protected virtual void HandleListenerException(Exception ex)
{
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown log.
this.logger.Debug("Listener exception after container shutdown", ex);
this.logger.Debug(m => m("Listener exception after container shutdown"), ex);
}
}

Expand Down Expand Up @@ -642,7 +662,6 @@ public class SharedConnectionNotInitializedException : SystemException
{
/// <summary>Initializes a new instance of the <see cref="SharedConnectionNotInitializedException"/> class.</summary>
/// <param name="message">The message.</param>
public SharedConnectionNotInitializedException(string message)
: base(message) { }
public SharedConnectionNotInitializedException(string message) : base(message) { }
}
}
Expand Up @@ -65,6 +65,10 @@ namespace Spring.Messaging.Amqp.Rabbit.Listener.Adapter
/// </remarks>
/// <author>Mark Pollack</author>
/// <author>Mark Pollack (.NET)</author>
/// <author>Juergen Hoeller</author>
/// <author>Mark Fisher</author>
/// <author>Dave Syer</author>
/// <author>Joe Fitzgerald</author>
public class MessageListenerAdapter : IMessageListener, IChannelAwareMessageListener
{
/// <summary>
Expand All @@ -87,7 +91,7 @@ public class MessageListenerAdapter : IMessageListener, IChannelAwareMessageList
/// <summary>
/// The logger.
/// </summary>
private readonly ILog logger = LogManager.GetLogger(typeof(MessageListenerAdapter));
private readonly ILog logger = LogManager.GetCurrentClassLogger();

/// <summary>
/// The handler object.
Expand Down Expand Up @@ -178,7 +182,15 @@ public MessageListenerAdapter(object handlerObject, IMessageConverter messageCon
/// on a custom subclass of this adapter, defining listener methods.
/// </remarks>
/// <value>The handler object.</value>
public object HandlerObject { get { return this.handlerObject; } set { this.handlerObject = value; } }
public object HandlerObject
{
get { return this.handlerObject; }
set
{
AssertUtils.ArgumentNotNull(value, "HandlerObject must not be null");
this.handlerObject = value;
}
}

/// <summary>
/// Sets Encoding.
Expand Down Expand Up @@ -265,30 +277,30 @@ public void OnMessage(Message message, IModel channel)
{
if (this.handlerObject != this)
{
if (typeof(IChannelAwareMessageListener).IsInstanceOfType(this.handlerObject))
if (this.handlerObject is IChannelAwareMessageListener)
{
if (channel != null)
{
((IChannelAwareMessageListener)this.handlerObject).OnMessage(message, channel);
return;
}
else if (!typeof(IMessageListener).IsInstanceOfType(this.handlerObject))
else if (!(this.handlerObject is IMessageListener))
{
throw new InvalidOperationException(
throw new AmqpIllegalStateException(
"MessageListenerAdapter cannot handle a " +
"IChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
}
}

if (typeof(IMessageListener).IsInstanceOfType(this.handlerObject))
if (this.handlerObject is IMessageListener)
{
((IMessageListener)this.handlerObject).OnMessage(message);
return;
}
}

// Regular case: find a handler method reflectively.
object convertedMessage = this.ExtractMessage(message);
var convertedMessage = this.ExtractMessage(message);

var methodName = this.GetListenerMethodName(message, convertedMessage);
if (methodName == null)
Expand All @@ -308,7 +320,7 @@ public void OnMessage(Message message, IModel channel)
}
else
{
this.logger.Trace("No result object given - no result to handle");
this.logger.Trace(m => m("No result object given - no result to handle"));
}
}

Expand All @@ -324,7 +336,7 @@ public void OnMessage(Message message, IModel channel)
/// exceptions get handled by the caller instead.</para>
/// </summary>
/// <param name="ex">The exception to handle.</param>
protected virtual void HandleListenerException(Exception ex) { this.logger.Error("Listener execution failed", ex); }
protected virtual void HandleListenerException(Exception ex) { this.logger.Error(m => m("Listener execution failed"), ex); }

/// <summary>Extract the message body from the given message.</summary>
/// <param name="message">The message.</param>
Expand Down Expand Up @@ -393,7 +405,7 @@ protected object InvokeListenerMethod(string methodName, object[] arguments)
catch (TargetInvocationException ex)
{
var targetEx = ex.InnerException;
if (targetEx != null && targetEx is IOException)
if (targetEx is IOException)
{
throw new AmqpIOException((IOException)targetEx);
}
Expand Down Expand Up @@ -441,22 +453,16 @@ protected virtual void HandleResult(object result, Message request, IModel chann
{
if (channel != null)
{
if (this.logger.IsDebugEnabled)
{
this.logger.Debug("Listener method returned result [" + result + "] - generating response message for it");
}

this.logger.Debug(m => m("Listener method returned result [{0}] - generating response message for it", result));

var response = this.BuildMessage(channel, result);
this.PostProcessResponse(request, response);
var replyTo = this.GetReplyToAddress(request);
this.SendResponse(channel, replyTo, response);
}
else
{
if (this.logger.IsDebugEnabled)
{
this.logger.Debug("Listener method returned result [" + result + "]: not generating response message for it because of no Rabbit Channel given");
}
this.logger.Warn(m => m("Listener method returned result [{0}]: not generating response message for it because of no Rabbit Channel given", result));
}
}

Expand All @@ -482,13 +488,12 @@ protected Message BuildMessage(IModel channel, object result)
return converter.ToMessage(result, new MessageProperties());
}

var msg = result as Message;
if (msg == null)
if (!(result is Message))
{
throw new MessageConversionException("No IMessageConverter specified - cannot handle message [" + result + "]");
}

return msg;
return (Message)result;
}

/// <summary>Post-process the given response message before it will be sent. The default implementation
Expand Down Expand Up @@ -532,7 +537,7 @@ protected virtual Address GetReplyToAddress(Message request)
{
if (string.IsNullOrEmpty(this.responseExchange))
{
throw new AmqpException("Cannot determine ReplyTo message property value: " + "Request message does not contain reply-to property, and no default response Exchange was set.");
throw new AmqpException("Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.");
}

replyTo = new Address(null, this.responseExchange, this.responseRoutingKey);
Expand All @@ -551,7 +556,7 @@ protected virtual void SendResponse(IModel channel, Address replyTo, Message mes

try
{
this.logger.Debug("Publishing response to exchanage = [" + replyTo.ExchangeName + "], routingKey = [" + replyTo.RoutingKey + "]");
this.logger.Debug(m => m("Publishing response to exchanage = [{0}], routingKey = [{1}]", replyTo.ExchangeName, replyTo.RoutingKey));
channel.BasicPublish(
replyTo.ExchangeName, replyTo.RoutingKey, this.mandatoryPublish, this.immediatePublish, this.messagePropertiesConverter.FromMessageProperties(channel, message.MessageProperties, this.encoding), message.Body);
}
Expand Down
16 changes: 6 additions & 10 deletions src/Spring.Messaging.Amqp.Rabbit/Listener/BlockingQueueConsumer.cs
Expand Up @@ -260,7 +260,7 @@ public void Stop()

/// <summary>The to string.</summary>
/// <returns>The System.String.</returns>
public string ToString() { return "Consumer: tag=[" + (this.consumer != null ? this.consumer.ConsumerTag : null) + "], channel=" + this.channel + ", acknowledgeMode=" + this.acknowledgeMode + " local queue size=" + this.queue.Count; }
public override string ToString() { return "Consumer: tag=[" + (this.consumer != null ? this.consumer.ConsumerTag : null) + "], channel=" + this.channel + ", acknowledgeMode=" + this.acknowledgeMode + " local queue size=" + this.queue.Count; }

/// <summary>Perform a rollback, handling rollback excepitons properly.</summary>
/// <param name="channel">The channel to rollback.</param>
Expand Down Expand Up @@ -384,16 +384,12 @@ internal class InternalConsumer : DefaultBasicConsumer
public InternalConsumer(IModel channel, BlockingQueueConsumer outer) : base(channel) { this.outer = outer; }

/// <summary>Handle model shutdown, given a consumerTag.</summary>
/// <param name="consumerTag">The consumer tag.</param>
/// <param name="sig">The sig.</param>
public void HandleModelShutdown(string consumerTag, ShutdownEventArgs sig)
public override void HandleModelShutdown(IModel channel, ShutdownEventArgs reason)
{
if (this.logger.IsDebugEnabled)
{
this.logger.Debug("Received shutdown signal for consumer tag=" + consumerTag + " , cause=" + sig.Cause);
}

this.outer.shutdown = sig;
base.HandleModelShutdown(channel, reason);
this.logger.Warn(m => m("Received shutdown signal for channel, cause: {0}", reason.Cause));

this.outer.shutdown = reason;
this.outer.deliveryTags.Clear();
}

Expand Down

0 comments on commit d78d30a

Please sign in to comment.