diff --git a/src/Spring.Messaging.Amqp.Rabbit/Config/BindingFactoryObject.cs b/src/Spring.Messaging.Amqp.Rabbit/Config/BindingFactoryObject.cs index 4ee68d6..c91f237 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Config/BindingFactoryObject.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Config/BindingFactoryObject.cs @@ -15,9 +15,11 @@ #region Using Directives using System; +using System.Collections; using System.Collections.Generic; using Spring.Messaging.Amqp.Core; using Spring.Objects.Factory; +using Queue = Spring.Messaging.Amqp.Core.Queue; #endregion namespace Spring.Messaging.Amqp.Rabbit.Config @@ -27,7 +29,7 @@ namespace Spring.Messaging.Amqp.Rabbit.Config /// public class BindingFactoryObject : IFactoryObject { - private IDictionary arguments; + private IDictionary arguments; private string routingKey = string.Empty; private string exchange; private Queue destinationQueue; @@ -39,7 +41,7 @@ public class BindingFactoryObject : IFactoryObject /// The arguments. /// Dave Syer /// Joe Fitzgerald - public IDictionary Arguments { set { this.arguments = value; } } + public IDictionary Arguments { set { this.arguments = value; } } /// /// Sets the routing key. diff --git a/src/Spring.Messaging.Amqp.Rabbit/Config/ListenerContainerParser.cs b/src/Spring.Messaging.Amqp.Rabbit/Config/ListenerContainerParser.cs index 01e8a68..cba6c95 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Config/ListenerContainerParser.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Config/ListenerContainerParser.cs @@ -114,7 +114,7 @@ private void ParseListener(XmlElement listenerEle, XmlElement containerEle, Pars } } - var containerDef = RabbitNamespaceUtils.ParseContainer(listenerEle, containerEle, parserContext); + var containerDef = RabbitNamespaceUtils.ParseContainer(containerEle, parserContext); if (listenerEle.HasAttribute(RESPONSE_EXCHANGE_ATTRIBUTE)) { diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/AbstractConnectionFactory.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/AbstractConnectionFactory.cs index e98bdd7..d107ee2 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/AbstractConnectionFactory.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/AbstractConnectionFactory.cs @@ -55,6 +55,10 @@ public abstract class AbstractConnectionFactory : IConnectionFactory, IDisposabl /// private readonly CompositeChannelListener channelListener = new CompositeChannelListener(); + // private volatile IExecutorService executorService; + + private volatile AmqpTcpEndpoint[] addresses; + /// Initializes a new instance of the class. /// The rabbit connection factory. public AbstractConnectionFactory(ConnectionFactory rabbitConnectionFactory) @@ -90,6 +94,18 @@ public AbstractConnectionFactory(ConnectionFactory rabbitConnectionFactory) /// public int Port { get { return this.rabbitConnectionFactory.Port; } set { this.rabbitConnectionFactory.Port = value; } } + public string Addresses + { + set + { + var addressArray = AmqpTcpEndpoint.ParseMultiple(RabbitMQ.Client.Protocols.DefaultProtocol, value); + if (addressArray != null && addressArray.Length > 0) + { + this.addresses = addressArray; + } + } + } + /// /// Gets the channel listener. /// @@ -138,7 +154,16 @@ public virtual IConnection CreateBareConnection() { try { - return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection()); + if (this.addresses != null) + { + // TODO: Waiting on RabbitMQ.Client to catch up to the Java equivalent here. + // return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection(this.addresses)); + return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection()); + } + else + { + return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection()); + } } catch (Exception ex) { diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/CachingConnectionFactory.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/CachingConnectionFactory.cs index 5bfbc8c..9fae150 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/CachingConnectionFactory.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/CachingConnectionFactory.cs @@ -16,37 +16,19 @@ #region Using Directives using System; using System.Collections.Generic; +using System.Linq; using System.Reflection; using AopAlliance.Intercept; using Common.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Impl; using Spring.Aop.Framework; +using Spring.Messaging.Amqp.Rabbit.Support; using Spring.Util; #endregion namespace Spring.Messaging.Amqp.Rabbit.Connection { - /** - * A {@link IConnectionFactory} implementation that returns the same Connections from all {@link #createConnection()} - * calls, and ignores calls to {@link com.rabbitmq.client.Connection#close()} and caches - * {@link com.rabbitmq.client.Channel}. - * - *

- * By default, only one Channel will be cached, with further requested Channels being created and disposed on demand. - * Consider raising the {@link #setChannelCacheSize(int) "channelCacheSize" value} in case of a high-concurrency - * environment. - * - *

- * NOTE: This ConnectionFactory requires explicit closing of all Channels obtained form its shared Connection. - * This is the usual recommendation for native Rabbit access code anyway. However, with this ConnectionFactory, its use - * is mandatory in order to actually allow for Channel reuse. - * - * @author Mark Pollack - * @author Mark Fisher - * @author Dave Syer - */ - ///

/// A caching connection factory implementation. The default channel cache size is 1, please modify to /// meet your scaling needs. @@ -97,21 +79,23 @@ public class CachingConnectionFactory : AbstractConnectionFactory /// private ChannelCachingConnectionProxy connection; + private volatile bool publisherConfirms; + + private volatile bool publisherReturns; + /// /// Synchronization monitor for the shared Connection. /// private readonly object connectionMonitor = new object(); - /// - /// Initializes a new instance of the class - /// initializing the hostname to be the value returned from Dns.GetHostName() or "localhost" - /// if Dns.GetHostName() throws an exception. - /// + /// Initializes a new instance of the class. + /// Create a new initializing the hostname to be the value returned from Dns.GetHostName() or "localhost" + /// if Dns.GetHostName() throws an exception. public CachingConnectionFactory() : this(string.Empty) { } - /// Initializes a new instance of the class given a host name and port - /// The hostname. - /// The port. + /// Initializes a new instance of the class. Create a new given a host name and port + /// The hostname to connect to. + /// The port number. public CachingConnectionFactory(string hostname, int port) : base(new ConnectionFactory()) { if (string.IsNullOrWhiteSpace(hostname)) @@ -123,15 +107,15 @@ public CachingConnectionFactory(string hostname, int port) : base(new Connection this.Port = port; } - /// Initializes a new instance of the class given a port - /// The port. + /// Initializes a new instance of the class. Create a new given a port + /// The port number. public CachingConnectionFactory(int port) : this(string.Empty, port) { } - /// Initializes a new instance of the class given a host name. - /// The hostname. + /// Initializes a new instance of the class. Create a new given a host name. + /// The hostname to connect to. public CachingConnectionFactory(string hostname) : this(hostname, Protocols.DefaultProtocol.DefaultPort) { } - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. Create a new for the given ConnectionFactory. /// The rabbit connection factory. public CachingConnectionFactory(ConnectionFactory rabbitConnectionFactory) : base(rabbitConnectionFactory) { } @@ -149,6 +133,12 @@ public int ChannelCacheSize } } + /// Gets or sets a value indicating whether is publisher confirms. + public bool IsPublisherConfirms { get { return this.publisherConfirms; } set { this.publisherConfirms = value; } } + + /// Gets or sets a value indicating whether is publisher returns. + public bool IsPublisherReturns { get { return this.publisherReturns; } set { this.publisherReturns = value; } } + /// /// Gets or sets a value indicating whether this is active. /// @@ -159,7 +149,7 @@ public int ChannelCacheSize /// Sets the connection listeners. /// /// The connection listeners. - public new IList ConnectionListeners + public override IList ConnectionListeners { set { @@ -174,7 +164,7 @@ public new IList ConnectionListeners /// Add a connection listener. /// The listener. - public new void AddConnectionListener(IConnectionListener listener) + public override void AddConnectionListener(IConnectionListener listener) { base.AddConnectionListener(listener); @@ -202,18 +192,12 @@ internal IModel GetChannel(bool transactional) if (channel != null) { - if (this.Logger.IsTraceEnabled) - { - this.Logger.Trace("Found cached Rabbit Channel"); - } + this.Logger.Trace(m => m("Found cached Rabbit Channel")); } else { - if (this.Logger.IsDebugEnabled) - { - this.Logger.Debug("Creating cached Rabbit Channel"); - } - + this.Logger.Debug(m => m("Creating cached Rabbit Channel")); + channel = this.GetCachedChannelProxy(channelList, transactional); } @@ -229,8 +213,24 @@ internal IModel GetChannel(bool transactional) protected virtual IChannelProxy GetCachedChannelProxy(LinkedList channelList, bool transactional) { var targetChannel = this.CreateBareChannel(transactional); + this.Logger.Debug(m => m("Creating cached Rabbit Channel from {0}", targetChannel)); + this.ChannelListener.OnCreate(targetChannel, transactional); + /* + * TODO: Pending Completion of PublisherCallbackChannelImpl + * + IList interfaces; + if(this.publisherConfirms || this.publisherReturns) + { + interfaces = new List() { typeof(IChannelProxy), typeof(IPublisherCallbackChannel) }; + } + else + { + interfaces = new List() { typeof(IChannelProxy) }; + } + */ var factory = new ProxyFactory(typeof(IChannelProxy), new CachedChannelInvocationHandler(targetChannel, channelList, transactional, this)); + // factory.Interfaces = interfaces.ToArray(); var channelProxy = (IChannelProxy)factory.GetProxy(); return channelProxy; } @@ -248,7 +248,29 @@ internal IModel CreateBareChannel(bool transactional) this.CreateConnection(); } - return this.connection.CreateBareChannel(transactional); + var channel = this.connection.CreateBareChannel(transactional); + /* + * TODO: Pending Completion of PublisherCallbackChannelImpl + if (this.publisherConfirms) + { + try + { + channel.ConfirmSelect(); + } + catch (Exception ex) + { + this.Logger.Error(m => m("Could not configure the channel to receive publisher confirms"), ex); + } + } + if (this.publisherConfirms || this.publisherReturns) + { + if (!(channel is PublisherCallbackChannelImpl)) + { + channel = new PublisherCallbackChannelImpl(channel); + } + } + */ + return channel; } /// @@ -345,7 +367,7 @@ public override string ToString() } } - #region CachedChannelInvocationHandler - For a Rainy Day + #region CachedChannelInvocationHandler /// /// A cached channel invocation handler. @@ -468,7 +490,7 @@ public object Invoke(IMethodInvocation invocation) { // Basic re-connection logic... this.target = null; - this.Logger.Debug("Detected closed channel on exception. Re-initializing: " + this.target); + this.Logger.Debug(m => m("Detected closed channel on exception. Re-initializing: {0}", this.target)); lock (this.targetMonitor) { if (this.target == null) @@ -499,17 +521,11 @@ private void LogicalClose(IChannelProxy proxy) } // Allow for multiple close calls... - // TODO: Figure out why the proxied channels don't work with this.channelList.Contains() - // if (!this.channelList.Contains(proxy)) - // { - if (this.Logger.IsTraceEnabled) + if (!this.channelList.Contains(proxy)) { - this.Logger.Trace("Returning cached Channel: " + this.target); + this.Logger.Trace(m => m("Returning cached Channel: {0}", this.target)); + this.channelList.AddLast(proxy); } - - this.channelList.AddLast(proxy); - - // } } /// @@ -519,7 +535,7 @@ private void PhysicalClose() { if (this.Logger.IsDebugEnabled) { - this.Logger.Debug("Closing cached Channel: " + this.target); + this.Logger.Debug(m => m("Closing cached Channel: " + this.target)); } if (this.target == null) @@ -589,13 +605,13 @@ public IModel CreateChannel(bool transactional) /// The dispose. public void Dispose() { + this.outer.Reset(); if (this.target != null) { this.outer.ConnectionListener.OnClose(this.target); RabbitUtils.CloseConnection(this.target); } - this.outer.Reset(); this.target = null; } diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/CompositeChannelListener.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/CompositeChannelListener.cs index 1a80fef..d48b7ae 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/CompositeChannelListener.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/CompositeChannelListener.cs @@ -23,6 +23,8 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection /// /// A composite channel listener. /// + /// Dave Syer + /// Joe Fitzgerald public class CompositeChannelListener : IChannelListener { /// diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/ConnectionFactoryUtils.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/ConnectionFactoryUtils.cs index 39273df..dc000a2 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/ConnectionFactoryUtils.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/ConnectionFactoryUtils.cs @@ -15,6 +15,7 @@ #region Using Directives using System; +using System.Threading; using Common.Logging; using RabbitMQ.Client; using Spring.Transaction.Support; @@ -27,31 +28,40 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection /// Utility methods for connection factory. /// /// Mark Pollack + /// Mark Fisher + /// Dave Syer + /// Joe Fitzgerald public class ConnectionFactoryUtils { /// - /// The logger. + /// The Logger. /// - private static readonly ILog logger = LogManager.GetLogger(typeof(ConnectionFactoryUtils)); + private static readonly ILog Logger = LogManager.GetLogger(typeof(ConnectionFactoryUtils)); - /// Release a connection. - /// The connection. - public static void ReleaseConnection(IConnection connection) + private static readonly ThreadLocal consumerChannel = new ThreadLocal(); + + /// If a listener container is configured to use a RabbitTransactionManager, the + /// consumer's channel is registered here so that it is used as the bound resource + /// when the transaction actually starts. It is normally not necessary to use + /// an external transaction manager because local transactions work the same in that + /// the channel is bound to the thread. This is for the case when a user happens + /// to wire in a RabbitTransactionManager. + /// The channel. + public static void RegisterConsumerChannel(IModel channel) { - if (connection == null) - { - return; - } + Logger.Debug(m => m("Registering consumer channel {0}", channel)); - try - { - connection.Close(); - } - catch (Exception ex) - { - logger.Debug("Could not close RabbitMQ Connection", ex); - } + consumerChannel.Value = channel; } + + /// See RegisterConsumerChannel. This method is called to unregister the channel when the consumer exits. + public static void UnRegisterConsumerChannel() + { + Logger.Debug(m => m("Unregistering consumer channel {0}", consumerChannel.Value)); + + consumerChannel.Value = null; + } + /// Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's transaction facilities. /// The channel. @@ -67,7 +77,7 @@ public static bool IsChannelTransactional(IModel channel, IConnectionFactory con var resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.GetResource(connectionFactory); return resourceHolder != null && resourceHolder.ContainsChannel(channel); } - + /// Obtain a RabbitMQ Channel that is synchronized with the current transaction, if any. /// The connection factory. /// The synched local transaction allowed. @@ -108,14 +118,20 @@ private static RabbitResourceHolder DoGetTransactionalResourceHolder(IConnection IModel channel = null; try { - bool isExistingCon = connection != null; + var isExistingCon = connection != null; if (!isExistingCon) { connection = resourceFactory.CreateConnection(); resourceHolderToUse.AddConnection(connection); } - channel = resourceFactory.CreateChannel(connection); + channel = consumerChannel.Value; + + if (channel == null) + { + channel = resourceFactory.CreateChannel(connection); + } + resourceHolderToUse.AddChannel(channel, connection); if (resourceHolderToUse != resourceHolder) @@ -129,7 +145,7 @@ private static RabbitResourceHolder DoGetTransactionalResourceHolder(IConnection { RabbitUtils.CloseChannel(channel); RabbitUtils.CloseConnection(connection); - throw; + throw new AmqpException(ex); } } diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelListener.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelListener.cs index 2b9a02a..0bf78a4 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelListener.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelListener.cs @@ -22,6 +22,8 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection /// /// A channel listener interface. /// + /// Dave Syer + /// Joe Fitzgerald public interface IChannelListener { /// Called when [create]. diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelProxy.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelProxy.cs index 33ac3a5..0bddd3c 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelProxy.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/IChannelProxy.cs @@ -22,6 +22,8 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection /// /// A channel proxy interface. /// + /// Mark Pollack + /// Joe Fitzgerald public interface IChannelProxy : IModel { /// diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceHolder.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceHolder.cs index 7ce5cbf..7132934 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceHolder.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceHolder.cs @@ -15,7 +15,6 @@ #region Using Directives using System; -using System.Collections; using System.Collections.Generic; using Common.Logging; using RabbitMQ.Client; @@ -33,9 +32,9 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection public class RabbitResourceHolder : ResourceHolderSupport { /// - /// The logger. + /// The Logger. /// - private static readonly ILog logger = LogManager.GetLogger(typeof(ResourceHolderSupport)); + private static readonly ILog Logger = LogManager.GetLogger(typeof(ResourceHolderSupport)); /// /// The frozen flag. @@ -45,12 +44,12 @@ public class RabbitResourceHolder : ResourceHolderSupport /// /// The connections. /// - private readonly IList connections = new List(); + private readonly LinkedList connections = new LinkedList(); /// /// The channels. /// - private readonly IList channels = new List(); + private readonly LinkedList channels = new LinkedList(); /// /// The channels per connection. @@ -65,7 +64,12 @@ public class RabbitResourceHolder : ResourceHolderSupport /// /// The transactional flag. /// - private bool _channelTransactional; + private bool transactional; + + /// + /// Release after completion. + /// + private bool releaseAfterCompletion = true; /// /// Initializes a new instance of the class. @@ -74,17 +78,27 @@ public class RabbitResourceHolder : ResourceHolderSupport /// Initializes a new instance of the class. /// The channel. - public RabbitResourceHolder(IModel channel) : this() { this.AddChannel(channel); } + /// The release After Completion. + public RabbitResourceHolder(IModel channel, bool releaseAfterCompletion) : this() + { + this.AddChannel(channel); + this.releaseAfterCompletion = releaseAfterCompletion; + } /// /// Gets a value indicating whether Frozen. /// public bool Frozen { get { return this.frozen; } } + public bool ReleaseAfterCompletion + { + get { return this.releaseAfterCompletion; } + } + /// /// Gets a value indicating whether IsChannelTransactional. /// - public bool IsChannelTransactional { get { return this._channelTransactional; } } + public bool IsChannelTransactional { get { return this.transactional; } } /// Add a connection. /// The connection. @@ -94,7 +108,7 @@ public void AddConnection(IConnection connection) AssertUtils.ArgumentNotNull(connection, "Connection must not be null"); if (!this.connections.Contains(connection)) { - this.connections.Add(connection); + this.connections.AddLast(connection); } } @@ -111,20 +125,19 @@ public void AddChannel(IModel channel, IConnection connection) AssertUtils.ArgumentNotNull(channel, "Channel must not be null"); if (!this.channels.Contains(channel)) { - this.channels.Add(channel); + this.channels.AddLast(channel); if (connection != null) { - List channels; - this.channelsPerConnection.TryGetValue(connection, out channels); + List tempChannels; + this.channelsPerConnection.TryGetValue(connection, out tempChannels); - // TODO: double check, what about TryGet.. - if (channels == null) + if (tempChannels == null) { - channels = new List(); - this.channelsPerConnection.Add(connection, channels); + tempChannels = new List(); + this.channelsPerConnection.Add(connection, tempChannels); } - channels.Add(channel); + channels.AddLast(channel); } } } @@ -137,7 +150,7 @@ public void AddChannel(IModel channel, IConnection connection) /// /// Gets Connection. /// - public IConnection Connection { get { return this.connections.Count != 0 ? this.connections[0] : null; } } + public IConnection Connection { get { return (this.connections != null && this.connections.Count > 0) ? this.connections.First.Value : null; } } /// /// Gets a connection. @@ -151,13 +164,18 @@ public void AddChannel(IModel channel, IConnection connection) public IConnection GetConnection() where T : IConnection { Type type = typeof(T); - return (IConnection)CollectionUtils.FindValueOfType((ICollection)this.connections, type); + return (IConnection)CollectionUtils.FindValueOfType(this.connections, type); } + public IConnection GetConnection(Type connectionType) where T : IConnection + { + return (T)CollectionUtils.FindValueOfType(this.connections, connectionType); + } + /// /// Gets Channel. /// - public IModel Channel { get { return this.channels.Count != 0 ? this.channels[0] : null; } } + public IModel Channel { get { return (this.channels != null && this.channels.Count > 0) ? this.channels.First.Value : null; } } /// /// Commit all delivery tags. @@ -183,7 +201,7 @@ public void CommitAll() } catch (Exception e) { - throw new AmqpException("failed to commit RabbitMQ transaction", e); + throw new AmqpException("Failed to commit RabbitMQ transaction", e); } } @@ -200,7 +218,7 @@ public void CloseAll() } catch (Exception ex) { - logger.Debug("Could not close synchronized Rabbit Channel after transaction", ex); + Logger.Debug("Could not close synchronized Rabbit Channel after transaction", ex); } } @@ -240,11 +258,8 @@ public void RollbackAll() { foreach (var channel in this.channels) { - if (logger.IsDebugEnabled) - { - logger.Debug(string.Format("Rollingback messages to channel: {0}", channel)); - } - + Logger.Debug(m=> m("Rollingback messages to channel: {0}", channel)); + RabbitUtils.RollbackIfNecessary(channel); if (this.deliveryTags.ContainsKey(channel)) { diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceSynchronization.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceSynchronization.cs index d40763f..33b7ba8 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceSynchronization.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitResourceSynchronization.cs @@ -19,14 +19,9 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection { - /** - * Callback for resource cleanup at the end of a non-native RabbitMQ transaction (e.g. when participating in a - * JtaTransactionManager transaction). - * @see org.springframework.transaction.jta.JtaTransactionManager - */ - /// - /// Rabbit resource synchronization implementation. + /// Callback for resource cleanup at the end of a non-native RabbitMQ transaction (e.g. when participating in a + /// JtaTransactionManager transaction). /// internal class RabbitResourceSynchronization : TransactionSynchronizationAdapter { @@ -70,6 +65,11 @@ public void AfterCompletion(int status) this.resourceHolder.RollbackAll(); } + if (resourceHolder.ReleaseAfterCompletion) + { + resourceHolder.SynchronizedWithTransaction = false; + } + this.AfterCompletion((TransactionSynchronizationStatus)status); } diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitUtils.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitUtils.cs index d560911..a5a93ae 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitUtils.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/RabbitUtils.cs @@ -37,9 +37,9 @@ public class RabbitUtils public static readonly int DEFAULT_PORT = Protocols.DefaultProtocol.DefaultPort; /// - /// The logger. + /// The Logger. /// - private static readonly ILog logger = LogManager.GetLogger(typeof(RabbitUtils)); + private static readonly ILog Logger = LogManager.GetLogger(typeof(RabbitUtils)); /// Closes the given Rabbit Connection and ignore any thrown exception. /// This is useful for typical 'finally' blocks in manual Rabbit @@ -55,11 +55,11 @@ public static void CloseConnection(IConnection connection) } catch (AlreadyClosedException acex) { - logger.Debug("Connection is already closed.", acex); + Logger.Debug("Connection is already closed.", acex); } catch (Exception ex) { - logger.Debug("Ignoring Connection exception - assuming already closed: ", ex); + Logger.Debug("Ignoring Connection exception - assuming already closed: ", ex); } } } @@ -76,11 +76,11 @@ public static void CloseChannel(IModel channel) } catch (IOException ioex) { - logger.Debug("Could not close RabbitMQ Channel", ioex); + Logger.Debug("Could not close RabbitMQ Channel", ioex); } catch (Exception ex) { - logger.Debug("Unexpected exception on closing RabbitMQ Channel", ex); + Logger.Debug("Unexpected exception on closing RabbitMQ Channel", ex); } } } @@ -100,9 +100,9 @@ public static void CommitIfNecessary(IModel channel) { throw new AmqpException("An error occurred committing the transaction.", oiex); } - catch (IOException ioex) + catch (Exception ex) { - throw new AmqpIOException("An error occurred committing the transaction.", ioex); + throw new AmqpException("An error occurred committing the transaction.", ex); } } @@ -121,9 +121,9 @@ public static void RollbackIfNecessary(IModel channel) { throw new AmqpException("An error occurred rolling back the transaction.", oiex); } - catch (IOException ex) + catch (Exception ex) { - throw new AmqpIOException("An error occurred rolling back the transaction.", ex); + throw new AmqpException("An error occurred rolling back the transaction.", ex); } } @@ -140,12 +140,12 @@ public static SystemException ConvertRabbitAccessException(Exception ex) if (ex is IOException) { - return new AmqpIOException(string.Empty, (IOException)ex); + return new AmqpIOException((IOException)ex); } if (ex is OperationInterruptedException) { - return new AmqpIOException(ex.Message, new IOException(ex.Message, ex)); + return new AmqpIOException(new AmqpException(ex.Message, ex)); } /* @@ -166,7 +166,7 @@ public static SystemException ConvertRabbitAccessException(Exception ex) */ // fallback - return new UncategorizedAmqpException(string.Empty, ex); + return new UncategorizedAmqpException(ex); } /// Close the message consumer. diff --git a/src/Spring.Messaging.Amqp.Rabbit/Connection/SimpleConnection.cs b/src/Spring.Messaging.Amqp.Rabbit/Connection/SimpleConnection.cs index 3ba66ef..7e79972 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Connection/SimpleConnection.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Connection/SimpleConnection.cs @@ -23,6 +23,7 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection /// /// A Simple Connection implementation. /// + /// Joe Fitzgerald public class SimpleConnection : IConnection { /// diff --git a/src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs b/src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs index a50dd45..490b175 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs @@ -247,11 +247,22 @@ public void Send(string exchange, string routingKey, Message message) /// The message post processor. public void ConvertAndSend(object message, Func messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); } + /// Convert and send a message, given the message. + /// The message. + /// The message post processor. + public void ConvertAndSend(object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); } + /// Convert and send a message, given a routing key and the message. /// The routing key. /// The message. /// The message post processor. public void ConvertAndSend(string routingKey, object message, Func messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor); } + + /// Convert and send a message, given a routing key and the message. + /// The routing key. + /// The message. + /// The message post processor. + public void ConvertAndSend(string routingKey, object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor); } /// Convert and send a message, given an exchange, a routing key, and the message. /// The exchange. @@ -265,6 +276,18 @@ public void ConvertAndSend(string exchange, string routingKey, object message, F this.Send(exchange, routingKey, this.Execute(channel => messageToSend)); } + /// Convert and send a message, given an exchange, a routing key, and the message. + /// The exchange. + /// The routing key. + /// The message. + /// The message post processor. + public void ConvertAndSend(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor) + { + var messageToSend = this.GetRequiredMessageConverter().ToMessage(message, new MessageProperties()); + messageToSend = messagePostProcessor.PostProcessMessage(messageToSend); + this.Send(exchange, routingKey, this.Execute(channel => messageToSend)); + } + /// /// Receive a message. /// @@ -350,20 +373,20 @@ public object ReceiveAndConvert(string queueName) /// Convert, send, and receive a message, given the message. /// The message to send. /// The message received. - public object ConvertSendAndReceive(object message) { return this.ConvertSendAndReceive(this.exchange, this.routingKey, message, null); } + public object ConvertSendAndReceive(object message) { return this.ConvertSendAndReceive(this.exchange, this.routingKey, message, default(IMessagePostProcessor)); } /// Convert, send, and receive a message, given a routing key and the message. /// The routing key. /// The message to send. /// The message received. - public object ConvertSendAndReceive(string routingKey, object message) { return this.ConvertSendAndReceive(this.exchange, routingKey, message, null); } + public object ConvertSendAndReceive(string routingKey, object message) { return this.ConvertSendAndReceive(this.exchange, routingKey, message, default(IMessagePostProcessor)); } /// Convert, send, and receive a message, given an exchange, a routing key and the message. /// The exchange. /// The routing key. /// The message to send. /// The message received. - public object ConvertSendAndReceive(string exchange, string routingKey, object message) { return this.ConvertSendAndReceive(exchange, routingKey, message, null); } + public object ConvertSendAndReceive(string exchange, string routingKey, object message) { return this.ConvertSendAndReceive(exchange, routingKey, message, default(IMessagePostProcessor)); } /// Convert, send, and receive a message, given the message. /// The message to send. @@ -371,6 +394,12 @@ public object ReceiveAndConvert(string queueName) /// The message received. public object ConvertSendAndReceive(object message, Func messagePostProcessor) { return this.ConvertSendAndReceive(this.exchange, this.routingKey, message, messagePostProcessor); } + /// Convert, send, and receive a message, given the message. + /// The message to send. + /// The message post processor. + /// The message received. + public object ConvertSendAndReceive(object message, IMessagePostProcessor messagePostProcessor) { return this.ConvertSendAndReceive(this.exchange, this.routingKey, message, messagePostProcessor); } + /// Convert, send, and receive a message, given a routing key and the message. /// The routing key. /// The message to send. @@ -378,6 +407,13 @@ public object ReceiveAndConvert(string queueName) /// The message received. public object ConvertSendAndReceive(string routingKey, object message, Func messagePostProcessor) { return this.ConvertSendAndReceive(this.exchange, routingKey, message, messagePostProcessor); } + /// Convert, send, and receive a message, given a routing key and the message. + /// The routing key. + /// The message to send. + /// The message post processor. + /// The message received. + public object ConvertSendAndReceive(string routingKey, object message, IMessagePostProcessor messagePostProcessor) { return this.ConvertSendAndReceive(this.exchange, routingKey, message, messagePostProcessor); } + /// Convert, send, and receive a message, given an exchange, a routing key and the message. /// The exchange. /// The routing key. @@ -402,6 +438,30 @@ public object ConvertSendAndReceive(string exchange, string routingKey, object m return this.GetRequiredMessageConverter().FromMessage(replyMessage); } + /// Convert, send, and receive a message, given an exchange, a routing key and the message. + /// The exchange. + /// The routing key. + /// The message to send. + /// The message post processor. + /// The message received. + public object ConvertSendAndReceive(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor) + { + var messageProperties = new MessageProperties(); + var requestMessage = this.GetRequiredMessageConverter().ToMessage(message, messageProperties); + if (messagePostProcessor != null) + { + requestMessage = messagePostProcessor.PostProcessMessage(requestMessage); + } + + var replyMessage = this.DoSendAndReceive(exchange, routingKey, requestMessage); + if (replyMessage == null) + { + return null; + } + + return this.GetRequiredMessageConverter().FromMessage(replyMessage); + } + /// Do the send and receive operation, given an exchange, a routing key and the message. /// The exchange. /// The routing key. diff --git a/src/Spring.Messaging.Amqp.Rabbit/Listener/Adapter/MessageListenerAdapter.cs b/src/Spring.Messaging.Amqp.Rabbit/Listener/Adapter/MessageListenerAdapter.cs index c5189b3..c94aa3b 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Listener/Adapter/MessageListenerAdapter.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Listener/Adapter/MessageListenerAdapter.cs @@ -395,7 +395,7 @@ protected object InvokeListenerMethod(string methodName, object[] arguments) var targetEx = ex.InnerException; if (targetEx != null && targetEx is IOException) { - throw new AmqpIOException("Error invoking listener method.", (IOException)targetEx); + throw new AmqpIOException((IOException)targetEx); } else { diff --git a/src/Spring.Messaging.Amqp.Rabbit/Listener/SimpleMessageListenerContainer.cs b/src/Spring.Messaging.Amqp.Rabbit/Listener/SimpleMessageListenerContainer.cs index 7354bbd..3c897e3 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Listener/SimpleMessageListenerContainer.cs +++ b/src/Spring.Messaging.Amqp.Rabbit/Listener/SimpleMessageListenerContainer.cs @@ -489,7 +489,7 @@ internal bool ReceiveAndExecute(BlockingQueueConsumer consumer) return (bool)new TransactionTemplate(this.transactionManager).Execute( delegate { - ConnectionFactoryUtils.BindResourceToTransaction(new RabbitResourceHolder(consumer.Channel), this.ConnectionFactory, true); + ConnectionFactoryUtils.BindResourceToTransaction(new RabbitResourceHolder(consumer.Channel, false), this.ConnectionFactory, true); try { return this.DoReceiveAndExecute(consumer); diff --git a/src/Spring.Messaging.Amqp.Rabbit/Spring.Messaging.Amqp.Rabbit.2010.csproj b/src/Spring.Messaging.Amqp.Rabbit/Spring.Messaging.Amqp.Rabbit.2010.csproj index bb7c271..097739c 100644 --- a/src/Spring.Messaging.Amqp.Rabbit/Spring.Messaging.Amqp.Rabbit.2010.csproj +++ b/src/Spring.Messaging.Amqp.Rabbit/Spring.Messaging.Amqp.Rabbit.2010.csproj @@ -144,9 +144,13 @@ + + + + diff --git a/src/Spring.Messaging.Amqp.Rabbit/Support/CorrelationData.cs b/src/Spring.Messaging.Amqp.Rabbit/Support/CorrelationData.cs new file mode 100644 index 0000000..06aa831 --- /dev/null +++ b/src/Spring.Messaging.Amqp.Rabbit/Support/CorrelationData.cs @@ -0,0 +1,41 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright 2002-2012 the original author or authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Spring.Messaging.Amqp.Rabbit.Support +{ + /// + /// Base class for correlating publisher confirms to sent messages. + /// Use the {@link RabbitTemplate} methods that include one of + /// these as a parameter; when the publisher confirm is received, + /// the CorrelationData is returned with the ack/nack. + /// + /// Gary Russell + /// Joe Fitzgerald + public class CorrelationData + { + private readonly string id; + + /// Initializes a new instance of the class. + /// The id. + public CorrelationData(string id) { this.id = id; } + + /// Gets the id. + public string Id { get { return this.id; } } + + /// The to string. + /// The System.String. + public override string ToString() { return "CorrelationData [id=" + this.id + "]"; } + } +} diff --git a/src/Spring.Messaging.Amqp.Rabbit/Support/IListener.cs b/src/Spring.Messaging.Amqp.Rabbit/Support/IListener.cs new file mode 100644 index 0000000..9cb5a95 --- /dev/null +++ b/src/Spring.Messaging.Amqp.Rabbit/Support/IListener.cs @@ -0,0 +1,68 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright 2002-2012 the original author or authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +// +// -------------------------------------------------------------------------------------------------------------------- + +#region Using Directives +using System.Collections.Generic; +using RabbitMQ.Client; +#endregion + +namespace Spring.Messaging.Amqp.Rabbit.Support +{ + /// + /// Listeners implementing this interface can participate + /// in publisher confirms received from multiple channels, + /// by invoking addListener on each channel. Standard + /// AMQP channels do not support a listener being + /// registered on multiple channels. + /// + /// Gary Russell + /// Joe Fitzgerald + public interface IListener + { + /// Invoked by the channel when a confirm is received. + /// The pending confirmation, containing correlation data. + /// The ack. True when 'ack', false when 'nack'. + void HandleConfirm(PendingConfirm pendingConfirm, bool ack); + + /// The handle return. + /// The reply code. + /// The reply text. + /// The exchange. + /// The routing key. + /// The properties. + /// The body. + void HandleReturn( + int replyCode, + string replyText, + string exchange, + string routingKey, + IBasicProperties properties, + byte[] body); + + /// When called, this listener must remove all references to the pending confirm map. + /// The channel. + /// The pending confirm map. + void RemovePendingConfirmsReference(IModel channel, SortedList unconfirmed); + + /// Returns the UUID used to identify this Listener for returns. + string GetUuid { get; } + + /// Gets a value indicating whether is confirm listener. + bool IsConfirmListener { get; } + + /// Gets a value indicating whether is return listener. + bool IsReturnListener { get; } + } +} diff --git a/src/Spring.Messaging.Amqp.Rabbit/Support/IPublisherCallbackChannel.cs b/src/Spring.Messaging.Amqp.Rabbit/Support/IPublisherCallbackChannel.cs new file mode 100644 index 0000000..c9b2eef --- /dev/null +++ b/src/Spring.Messaging.Amqp.Rabbit/Support/IPublisherCallbackChannel.cs @@ -0,0 +1,51 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright 2002-2012 the original author or authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +// +// -------------------------------------------------------------------------------------------------------------------- + +#region Using Directives +using System.Collections.Generic; +using RabbitMQ.Client; +#endregion + +namespace Spring.Messaging.Amqp.Rabbit.Support +{ + /// + /// Instances of this interface support a single listener being registered for publisher confirms with multiple channels, by adding context to the callbacks. + /// + /// Gary Russell + /// Joe Fitzgerald + public interface IPublisherCallbackChannel : IModel + { + // static string RETURN_CORRELATION = "spring_return_correlation"; + + /// + /// Adds a {@link Listener} and returns a reference to the pending confirms map for that listener's pending + /// confirms, allowing the Listener to assess unconfirmed sends at any point in time. + /// The client must NOT modify the contents of this array, and must synchronize on it when iterating over its collections. + /// + /// The listener. + /// A reference to pending confirms for the listener. The System.Collections.Generic.SortedList`2[TKey -> System.Int64, TValue -> Spring.Messaging.Amqp.Rabbit.Support.PendingConfirm]. + SortedList AddListener(IListener listener); + + /// Removes the listener. + /// The System.Boolean. + bool RemoveListener(IListener listener); + + /// Adds a pending confirmation to this channel's map. + /// The listener. + /// The key to the map. + /// The PendingConfirm object. + void AddPendingConfirm(IListener listener, long seq, PendingConfirm pendingConfirm); + } +} diff --git a/src/Spring.Messaging.Amqp.Rabbit/Support/PendingConfirm.cs b/src/Spring.Messaging.Amqp.Rabbit/Support/PendingConfirm.cs new file mode 100644 index 0000000..816f3b1 --- /dev/null +++ b/src/Spring.Messaging.Amqp.Rabbit/Support/PendingConfirm.cs @@ -0,0 +1,51 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright 2002-2012 the original author or authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Spring.Messaging.Amqp.Rabbit.Support +{ + /// + /// Instances of this object track pending publisher confirms. + /// The timestamp allows the pending confirmation to be + /// expired. It also holds for + /// the client to correlate a confirm with a sent message. + /// + /// Gary Russell + /// Joe Fitzgerald + public class PendingConfirm + { + private readonly CorrelationData correlationData; + + private readonly long timestamp; + + /// Initializes a new instance of the class. + /// The correlation data. + /// The timestamp. + public PendingConfirm(CorrelationData correlationData, long timestamp) + { + this.correlationData = correlationData; + this.timestamp = timestamp; + } + + /// Gets the correlation data. + public CorrelationData CorrelationData { get { return this.correlationData; } } + + /// Gets the timestamp. + public long Timestamp { get { return this.timestamp; } } + + /// The to string. + /// The System.String. + public override string ToString() { return "PendingConfirm [correlationData=" + this.correlationData + "]"; } + } +} diff --git a/src/Spring.Messaging.Amqp.Rabbit/Support/PublisherCallbackChannelImpl.cs b/src/Spring.Messaging.Amqp.Rabbit/Support/PublisherCallbackChannelImpl.cs new file mode 100644 index 0000000..707bbdd --- /dev/null +++ b/src/Spring.Messaging.Amqp.Rabbit/Support/PublisherCallbackChannelImpl.cs @@ -0,0 +1,482 @@ +// ----------------------------------------------------------------------- +// +// TODO: Update copyright text. +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Common.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Framing.Impl.v0_9_1; +using RabbitMQ.Client.Impl; +using Spring.Util; + +namespace Spring.Messaging.Amqp.Rabbit.Support +{ + /// + /// Channel wrapper to allow a single listener able to handle confirms from multiple channels. + /// + /// Gary Russell + /// Joe Fitzgerald + public class PublisherCallbackChannelImpl : IPublisherCallbackChannel // , BasicAckEventHandler, BasicNackEventHandler + { + private readonly ILog logger = LogManager.GetCurrentClassLogger(); + + private readonly IModel channelDelegate; + + private readonly ConcurrentDictionary listeners = new ConcurrentDictionary(); + + private readonly ConcurrentDictionary> pendingConfirms = new ConcurrentDictionary>(); + + private readonly ConcurrentDictionary listenerForSeq = new ConcurrentDictionary(); + + public PublisherCallbackChannelImpl(IModel channelDelegate) { this.channelDelegate = channelDelegate; } + + #region Delegate Methods + public void AddShutdownListener(ModelShutdownEventHandler listener) { this.channelDelegate.ModelShutdown += listener; } + + public void RemoveShutdownListener(ModelShutdownEventHandler listener) { this.channelDelegate.ModelShutdown -= listener; } + + public ShutdownEventArgs CloseReason { get { return this.channelDelegate.CloseReason; } } + + public void NotifyListeners() { ((ModelBase)this.channelDelegate).m_session.Notify(); } + + public bool IsOpen { get { return this.channelDelegate.IsOpen; } } + + public int ChannelNumber { get { return ((ModelBase)this.channelDelegate).m_session.ChannelNumber; } } + + public IConnection Connection { get { return ((ModelBase)this.channelDelegate).m_session.Connection; } } + + public void Close(ushort closeCode, string closeMessage) { this.channelDelegate.Close((ushort)closeCode, closeMessage); } + + public void ChannelFlow(bool active) { this.channelDelegate.ChannelFlow(active); } + + // public ChannelFlowOk Flow() { return this.channelDelegate.ChannelFlow(); } + + public void Abort() { this.channelDelegate.Abort(); } + + public void Abort(ushort closeCode, string closeMessage) { this.channelDelegate.Abort((ushort)closeCode, closeMessage); } + + private IList flowListeners = new List(); + + public event FlowControlEventHandler FlowControl; + + public void AddFlowListener(FlowControlEventHandler listener) + { + this.flowListeners.Add(listener); + this.channelDelegate.FlowControl += listener; + } + + public void RemoveFlowListener(FlowControlEventHandler listener) + { + if (this.flowListeners.Contains(listener)) + { + this.flowListeners.Remove(listener); + this.channelDelegate.FlowControl -= listener; + } + } + + public void ClearFlowListeners() + { + foreach (var listener in this.flowListeners) + { + this.channelDelegate.FlowControl -= listener; + } + } + + public IBasicConsumer DefaultConsumer { get { return this.channelDelegate.DefaultConsumer; } set { this.channelDelegate.DefaultConsumer = value; } } + + public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) { this.channelDelegate.BasicQos((uint)prefetchSize, (ushort)prefetchCount, global); } + + public void BasicQos(int prefetchCount) { this.channelDelegate.BasicQos(0, (ushort)prefetchCount, false); } + + public void BasicPublish(string exchange, string routingKey, IBasicProperties props, byte[] body) { this.channelDelegate.BasicPublish(exchange, routingKey, props, body); } + + public void BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate, IBasicProperties props, byte[] body) { this.channelDelegate.BasicPublish(exchange, routingKey, mandatory, immediate, props, body); } + + public void ExchangeDeclare(string exchange, string type) { this.channelDelegate.ExchangeDeclare(exchange, type); } + + public void ExchangeDeclare(string exchange, string type, bool durable) { this.channelDelegate.ExchangeDeclare(exchange, type, durable); } + + public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { this.channelDelegate.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); } + + // public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, bool isInternal, IDictionary arguments) { this.channelDelegate.ExchangeDeclare(exchange, type, durable, autoDelete, isInternal, arguments); } + + public void ExchangeDeclarePassive(string name) { this.channelDelegate.ExchangeDeclarePassive(name); } + + public void ExchangeDelete(string exchange, bool ifUnused) { this.channelDelegate.ExchangeDelete(exchange, ifUnused); } + + public void ExchangeDelete(string exchange) { this.channelDelegate.ExchangeDelete(exchange); } + + public void ExchangeBind(string destination, string source, string routingKey) { this.channelDelegate.ExchangeBind(destination, source, routingKey); } + + public void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments) { this.channelDelegate.ExchangeBind(destination, source, routingKey, arguments); } + + public void ExchangeUnbind(string destination, string source, string routingKey) { this.channelDelegate.ExchangeUnbind(destination, source, routingKey); } + + public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary arguments) { this.channelDelegate.ExchangeUnbind(destination, source, routingKey, arguments); } + + public RabbitMQ.Client.QueueDeclareOk QueueDeclare() { return this.channelDelegate.QueueDeclare(); } + + public RabbitMQ.Client.QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { return this.channelDelegate.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); } + + public RabbitMQ.Client.QueueDeclareOk QueueDeclarePassive(string queue) { return this.channelDelegate.QueueDeclarePassive(queue); } + + public uint QueueDelete(string queue) { return this.channelDelegate.QueueDelete(queue); } + + public uint QueueDelete( + string queue, + bool ifUnused, + bool ifEmpty) { return this.channelDelegate.QueueDelete(queue, ifUnused, ifEmpty); } + + public void QueueBind( + string queue, + string exchange, + string routingKey) { this.channelDelegate.QueueBind(queue, exchange, routingKey); } + + public void QueueBind( + string queue, + string exchange, + string routingKey, + IDictionary arguments) { this.channelDelegate.QueueBind(queue, exchange, routingKey, arguments); } + + public void QueueUnbind( + string queue, + string exchange, + string routingKey) { this.channelDelegate.QueueUnbind(queue, exchange, routingKey, new Dictionary()); } + + public void QueueUnbind( + string queue, + string exchange, + string routingKey, + IDictionary arguments) { this.channelDelegate.QueueUnbind(queue, exchange, routingKey, arguments); } + + public uint QueuePurge(string queue) { return this.channelDelegate.QueuePurge(queue); } + + public BasicGetResult BasicGet(string queue, bool autoAck) { return this.channelDelegate.BasicGet(queue, autoAck); } + + public void BasicAck(ulong deliveryTag, bool multiple) { this.channelDelegate.BasicAck(deliveryTag, multiple); } + + public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) { this.channelDelegate.BasicNack(deliveryTag, multiple, requeue); } + + public void BasicReject(ulong deliveryTag, bool requeue) { this.channelDelegate.BasicReject(deliveryTag, requeue); } + + public string BasicConsume(string queue, IBasicConsumer callback) { return this.channelDelegate.BasicConsume(queue, true, callback); } + + public string BasicConsume(string queue, bool autoAck, IBasicConsumer callback) { return this.channelDelegate.BasicConsume(queue, autoAck, callback); } + + public string BasicConsume(string queue, bool autoAck, string consumerTag, IBasicConsumer callback) { return this.channelDelegate.BasicConsume(queue, autoAck, consumerTag, callback); } + + public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary arguments, IBasicConsumer callback) { return this.channelDelegate.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback); } + + public void BasicCancel(string consumerTag) { this.channelDelegate.BasicCancel(consumerTag); } + + public void BasicRecover() { this.channelDelegate.BasicRecover(true); } + + public void BasicRecover(bool requeue) { this.channelDelegate.BasicRecover(requeue); } + + public void BasicRecoverAsync(bool requeue) { this.channelDelegate.BasicRecoverAsync(requeue); } + + public void TxSelect() { this.channelDelegate.TxSelect(); } + + public void TxCommit() { this.channelDelegate.TxCommit(); } + + public void TxRollback() { this.channelDelegate.TxRollback(); } + + public void ConfirmSelect() { this.channelDelegate.ConfirmSelect(); } + + public ulong NextPublishSeqNo { get { return this.channelDelegate.NextPublishSeqNo; } } + + public bool WaitForConfirms() { return this.channelDelegate.WaitForConfirms(); } + + public bool WaitForConfirms(TimeSpan timeout, out bool timedOut) + { + return this.channelDelegate.WaitForConfirms(timeout, out timedOut); + } + + public void WaitForConfirmsOrDie() { this.channelDelegate.WaitForConfirmsOrDie(); } + + public void WaitForConfirmsOrDie(TimeSpan timeout) { this.channelDelegate.WaitForConfirmsOrDie(timeout); } + + // public void AsyncRpc(Method method) { this.channelDelegate.AsyncRpc(method); } + + // public Command Rpc(Method method) { return this.channelDelegate.Rpc(method); } + + private IList confirmListeners = new List(); + + public event BasicAckEventHandler BasicAcks; + + public void AddConfirmListener(BasicAckEventHandler listener) + { + this.confirmListeners.Add(listener); + this.channelDelegate.BasicAcks += listener; + } + + public void RemoveConfirmListener(BasicAckEventHandler listener) + { + if (this.confirmListeners.Contains(listener)) + { + this.confirmListeners.Remove(listener); + this.channelDelegate.BasicAcks -= listener; + } + } + + public void ClearConfirmListeners() + { + foreach (var listener in this.confirmListeners) + { + this.channelDelegate.BasicAcks -= listener; + } + } + + private IList returnListeners = new List(); + + public event BasicNackEventHandler BasicNacks; + + public void AddReturnListener(BasicNackEventHandler listener) + { + this.returnListeners.Add(listener); + this.channelDelegate.BasicNacks += listener; + } + + public void RemoveReturnListener(BasicNackEventHandler listener) + { + if (this.returnListeners.Contains(listener)) + { + this.returnListeners.Remove(listener); + this.channelDelegate.BasicNacks -= listener; + } + } + + public void ClearReturnListeners() + { + foreach (var listener in this.returnListeners) + { + this.channelDelegate.BasicNacks -= listener; + } + } + #endregion + + public void Close() + { + this.channelDelegate.Close(); + foreach (var entry in this.pendingConfirms) + { + var listener = entry.Key; + listener.RemovePendingConfirmsReference(this, entry.Value); + } + this.pendingConfirms.Clear(); + this.listenerForSeq.Clear(); + } + + public SortedList AddListener(IListener listener) + { + AssertUtils.ArgumentNotNull(listener, "Listener cannot be null"); + if (this.listeners.Count == 0) + { + this.AddConfirmListener(this); + this.AddReturnListener(this); + } + if (!this.listeners.Values.Contains(listener)) + { + var listenersAdd = this.listeners.TryAdd(listener.GetUuid, listener); + var pendingConfirmsAdd = this.pendingConfirms.TryAdd(listener, new SortedList()); + logger.Debug(m => m("Added listener " + listener)); + } + return this.pendingConfirms[listener]; + } + + public bool RemoveListener(IListener listener) + { + IListener mappedListener = null; + var mappedListenerSuccess = this.listeners.TryRemove(listener.GetUuid, out mappedListener); + var result = mappedListener != null; + if (result && this.listeners.Count == 0) + { + this.RemoveConfirmListener(this); + this.RemoveReturnListener(this); + } + var iterator = this.listenerForSeq.GetEnumerator(); + while (iterator.MoveNext()) + { + var entry = iterator.Current; + if (entry.Value == listener) + { + IListener currentValue; + listenerForSeq.TryRemove(entry.Key, out currentValue); + } + } + + SortedList currentConfirm; + this.pendingConfirms.TryRemove(listener, out currentConfirm); + return result; + } + + // ConfirmListener + public void HandleAck(long seq, bool multiple) + { + logger.Debug(m => m(this.ToString() + " PC:Ack:" + seq + ":" + multiple)); + + this.ProcessAck(seq, true, multiple); + } + + public void HandleNack(long seq, bool multiple) + { + logger.Debug(m => m(this.ToString() + " PC:Nack:" + seq + ":" + multiple)); + + this.ProcessAck(seq, false, multiple); + } + + private void ProcessAck(long seq, bool ack, bool multiple) + { + if (multiple) + { + /* + * Piggy-backed ack - extract all Listeners for this and earlier + * sequences. Then, for each Listener, handle each of it's acks. + */ + lock (this.pendingConfirms) + { + var involvedListeners = this.listenerForSeq.headMap(seq + 1); + // eliminate duplicates + var listeners = new HashSet(involvedListeners.Values); + foreach (var involvedListener in listeners) + { + // find all unack'd confirms for this listener and handle them + var confirmsMap = this.pendingConfirms[involvedListener]; + if (confirmsMap != null) + { + var confirms = confirmsMap.headMap(seq + 1); + var iterator = confirms.GetEnumerator(); + while (iterator.MoveNext()) + { + var entry = iterator.Current; + confirms.Remove(entry.Key); + this.DoHandleConfirm(ack, involvedListener, entry.Value); + } + } + } + } + } + else + { + var listener = this.listenerForSeq[seq]; + if (listener != null) + { + var pendingConfirm = this.pendingConfirms[listener].TryRemove(seq); + if (pendingConfirm != null) + { + this.DoHandleConfirm(ack, listener, pendingConfirm); + } + } + else + { + logger.Error(m => m("No listener for seq:" + seq)); + } + } + } + + private void DoHandleConfirm(bool ack, IListener listener, PendingConfirm pendingConfirm) + { + try + { + if (listener.IsConfirmListener) + { + logger.Debug(m => m("Sending confirm " + pendingConfirm)); + listener.HandleConfirm(pendingConfirm, ack); + } + } + catch (Exception e) + { + logger.Error(m => m("Exception delivering confirm"), e); + } + } + + public void AddPendingConfirm(IListener listener, long seq, PendingConfirm pendingConfirm) + { + var pendingConfirmsForListener = this.pendingConfirms[listener]; + AssertUtils.ArgumentNotNull(pendingConfirmsForListener, "Listener not registered"); + lock (this.pendingConfirms) + { + pendingConfirmsForListener.Add(seq, pendingConfirm); + } + this.listenerForSeq.TryAdd(seq, listener); + } + + // ReturnListener + + public void HandleReturn( + int replyCode, + String replyText, + String exchange, + String routingKey, + IBasicProperties properties, + byte[] body) + { + var uuidObject = properties.Headers["spring_return_correlation"].ToString(); + var listener = this.listeners[uuidObject]; + if (listener == null || !listener.IsReturnListener) + { + logger.Warn(m => m("No Listener for returned message")); + + } + else + { + listener.HandleReturn(replyCode, replyText, exchange, routingKey, properties, body); + } + } + + + public string BasicConsume(string queue, bool noAck, string consumerTag, IDictionary arguments, IBasicConsumer consumer) + { + throw new NotImplementedException(); + } + + public void BasicPublish(PublicationAddress addr, IBasicProperties basicProperties, byte[] body) + { + throw new NotImplementedException(); + } + + public event BasicRecoverOkEventHandler BasicRecoverOk; + + public event BasicReturnEventHandler BasicReturn; + + public event CallbackExceptionEventHandler CallbackException; + + public IBasicProperties CreateBasicProperties() + { + throw new NotImplementedException(); + } + + public IFileProperties CreateFileProperties() + { + throw new NotImplementedException(); + } + + public IStreamProperties CreateStreamProperties() + { + throw new NotImplementedException(); + } + + public void DtxSelect() + { + throw new NotImplementedException(); + } + + public void DtxStart(string dtxIdentifier) + { + throw new NotImplementedException(); + } + + public event ModelShutdownEventHandler ModelShutdown; + + public void Dispose() + { + + } + } +} diff --git a/src/Spring.Messaging.Amqp/AmqpIOException.cs b/src/Spring.Messaging.Amqp/AmqpIOException.cs index 9a1475a..25798bb 100644 --- a/src/Spring.Messaging.Amqp/AmqpIOException.cs +++ b/src/Spring.Messaging.Amqp/AmqpIOException.cs @@ -14,6 +14,7 @@ // -------------------------------------------------------------------------------------------------------------------- #region Using Directives +using System; using System.IO; #endregion @@ -29,6 +30,6 @@ public class AmqpIOException : AmqpException /// Initializes a new instance of the class. /// The message. /// The cause. - public AmqpIOException(string message, IOException cause) : base(message, cause) { } + public AmqpIOException(Exception cause) : base(cause) { } } } diff --git a/src/Spring.Messaging.Amqp/Core/AbstractExchange.cs b/src/Spring.Messaging.Amqp/Core/AbstractExchange.cs index 99dd48d..22501ae 100644 --- a/src/Spring.Messaging.Amqp/Core/AbstractExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/AbstractExchange.cs @@ -46,7 +46,7 @@ public abstract class AbstractExchange : IExchange /// /// The arguments. /// - private readonly IDictionary arguments; + private readonly IDictionary arguments; /// Initializes a new instance of the class, given a name. /// The name of the exchange. @@ -65,7 +65,7 @@ public abstract class AbstractExchange : IExchange /// if set to true /// the server should delete the exchange when it is no longer in use /// The arguments. - public AbstractExchange(string name, bool durable, bool autoDelete, IDictionary arguments) + public AbstractExchange(string name, bool durable, bool autoDelete, IDictionary arguments) { this.name = name; this.durable = durable; @@ -107,7 +107,7 @@ public AbstractExchange(string name, bool durable, bool autoDelete, IDictionary< /// Gets or sets the collection of arbitrary arguments to use when declaring an exchange. /// /// The arguments. - public IDictionary Arguments { get { return this.arguments; } } + public IDictionary Arguments { get { return this.arguments; } } public void AddArgument(string argName, object argValue) { diff --git a/src/Spring.Messaging.Amqp/Core/AnonymousQueue.cs b/src/Spring.Messaging.Amqp/Core/AnonymousQueue.cs index f825807..e8b8393 100644 --- a/src/Spring.Messaging.Amqp/Core/AnonymousQueue.cs +++ b/src/Spring.Messaging.Amqp/Core/AnonymousQueue.cs @@ -35,6 +35,6 @@ public class AnonymousQueue : Queue /// Initializes a new instance of the class. /// The arguments. - public AnonymousQueue(IDictionary arguments) : base(Guid.NewGuid().ToString(), false, true, true, arguments) { } + public AnonymousQueue(IDictionary arguments) : base(Guid.NewGuid().ToString(), false, true, true, arguments) { } } } diff --git a/src/Spring.Messaging.Amqp/Core/Binding.cs b/src/Spring.Messaging.Amqp/Core/Binding.cs index 4c13f5c..f54272a 100644 --- a/src/Spring.Messaging.Amqp/Core/Binding.cs +++ b/src/Spring.Messaging.Amqp/Core/Binding.cs @@ -48,7 +48,7 @@ public class Binding /// /// The arguments. /// - private readonly IDictionary arguments; + private readonly IDictionary arguments; /// /// The destination type. @@ -61,7 +61,7 @@ public class Binding /// The exchange. /// The routing key. /// The arguments. - public Binding(string destination, DestinationType destinationType, string exchange, string routingKey, IDictionary arguments) + public Binding(string destination, DestinationType destinationType, string exchange, string routingKey, IDictionary arguments) { this.destination = destination; this.destinationType = destinationType; @@ -104,7 +104,7 @@ public enum DestinationType /// /// Gets Arguments. /// - public IDictionary Arguments { get { return this.arguments; } } + public IDictionary Arguments { get { return this.arguments; } } /// /// Gets DestinationType. diff --git a/src/Spring.Messaging.Amqp/Core/BindingBuilder.cs b/src/Spring.Messaging.Amqp/Core/BindingBuilder.cs index 2aa94e2..939e4cc 100644 --- a/src/Spring.Messaging.Amqp/Core/BindingBuilder.cs +++ b/src/Spring.Messaging.Amqp/Core/BindingBuilder.cs @@ -44,7 +44,7 @@ public sealed class BindingBuilder /// The create map for keys. /// The keys. /// The map. - private static IDictionary CreateMapForKeys(params string[] keys) + private static IDictionary CreateMapForKeys(params string[] keys) { var map = new Dictionary(); foreach (var key in keys) @@ -207,7 +207,7 @@ public GenericArgumentsConfigurer(GenericExchangeRoutingKeyConfigurer configurer /// The and. /// The map. /// The binding. - public Binding And(IDictionary map) { return new Binding(this.configurer.destination.Name, this.configurer.destination.Type, this.configurer.exchange, this.routingKey, map); } + public Binding And(IDictionary map) { return new Binding(this.configurer.destination.Name, this.configurer.destination.Type, this.configurer.exchange, this.routingKey, map); } /// /// The noargs. @@ -282,7 +282,7 @@ internal HeadersExchangeMapConfigurer(DestinationConfigurer destination, Headers /// The where any. /// The header values. /// The headers exchange map binding creator. - public HeadersExchangeMapBindingCreator WhereAny(IDictionary headerValues) { return new HeadersExchangeMapBindingCreator(this.destination, this.exchange, headerValues, false); } + public HeadersExchangeMapBindingCreator WhereAny(IDictionary headerValues) { return new HeadersExchangeMapBindingCreator(this.destination, this.exchange, headerValues, false); } /// The where all. /// The header keys. @@ -292,7 +292,7 @@ internal HeadersExchangeMapConfigurer(DestinationConfigurer destination, Headers /// The where all. /// The header values. /// The headers exchange map binding creator. - public HeadersExchangeMapBindingCreator WhereAll(IDictionary headerValues) { return new HeadersExchangeMapBindingCreator(this.destination, this.exchange, headerValues, true); } + public HeadersExchangeMapBindingCreator WhereAll(IDictionary headerValues) { return new HeadersExchangeMapBindingCreator(this.destination, this.exchange, headerValues, true); } #region Nested type: HeadersExchangeKeysBindingCreator @@ -314,7 +314,7 @@ public class HeadersExchangeKeysBindingCreator /// /// The header map. /// - private readonly IDictionary headerMap; + private readonly IDictionary headerMap; /// Initializes a new instance of the class. /// The destination. @@ -360,14 +360,14 @@ public class HeadersExchangeMapBindingCreator /// /// The header map. /// - private readonly IDictionary headerMap; + private readonly IDictionary headerMap; /// Initializes a new instance of the class. /// The Destination. /// The Exchange. /// The header map. /// The match all. - internal HeadersExchangeMapBindingCreator(DestinationConfigurer destination, HeadersExchange exchange, IDictionary headerMap, bool matchAll) + internal HeadersExchangeMapBindingCreator(DestinationConfigurer destination, HeadersExchange exchange, IDictionary headerMap, bool matchAll) { // Assert.notEmpty(headerMap, "header map must not be empty"); this.headerMap = new Dictionary() { { "x-match", matchAll ? "all" : "any" } }; @@ -430,7 +430,7 @@ internal HeadersExchangeSingleValueBindingCreator(DestinationConfigurer destinat /// The binding. public Binding Matches(object value) { - IDictionary map = new Dictionary() { { this.key, value } }; + IDictionary map = new Dictionary() { { this.key, value } }; return new Binding(this.destination.Name, this.destination.Type, this.exchange.Name, string.Empty, map); } } diff --git a/src/Spring.Messaging.Amqp/Core/CustomExchange.cs b/src/Spring.Messaging.Amqp/Core/CustomExchange.cs index 4cde435..2df6f6a 100644 --- a/src/Spring.Messaging.Amqp/Core/CustomExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/CustomExchange.cs @@ -52,7 +52,7 @@ public class CustomExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public CustomExchange(string name, string type, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { this.type = type; } + public CustomExchange(string name, string type, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { this.type = type; } #region Overrides of AbstractExchange diff --git a/src/Spring.Messaging.Amqp/Core/DirectExchange.cs b/src/Spring.Messaging.Amqp/Core/DirectExchange.cs index 981bffd..fda9c26 100644 --- a/src/Spring.Messaging.Amqp/Core/DirectExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/DirectExchange.cs @@ -52,7 +52,7 @@ public class DirectExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public DirectExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } + public DirectExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } /// Gets the exchange type. public override string Type { get { return ExchangeTypes.Direct; } } diff --git a/src/Spring.Messaging.Amqp/Core/FanoutExchange.cs b/src/Spring.Messaging.Amqp/Core/FanoutExchange.cs index 0ac4685..380bf4f 100644 --- a/src/Spring.Messaging.Amqp/Core/FanoutExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/FanoutExchange.cs @@ -47,7 +47,7 @@ public class FanoutExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public FanoutExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } + public FanoutExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } /// /// Gets Type. diff --git a/src/Spring.Messaging.Amqp/Core/FederatedExchange.cs b/src/Spring.Messaging.Amqp/Core/FederatedExchange.cs index 317d6db..e5330e0 100644 --- a/src/Spring.Messaging.Amqp/Core/FederatedExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/FederatedExchange.cs @@ -48,7 +48,7 @@ public class FederatedExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public FederatedExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } + public FederatedExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } /// Sets the backing type. public string BackingType { set { this.AddArgument(BACKING_TYPE_ARG, value); } } diff --git a/src/Spring.Messaging.Amqp/Core/HeadersExchange.cs b/src/Spring.Messaging.Amqp/Core/HeadersExchange.cs index 7440aca..03b421d 100644 --- a/src/Spring.Messaging.Amqp/Core/HeadersExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/HeadersExchange.cs @@ -43,7 +43,7 @@ public class HeadersExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public HeadersExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } + public HeadersExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } /// /// Gets Type. diff --git a/src/Spring.Messaging.Amqp/Core/IExchange.cs b/src/Spring.Messaging.Amqp/Core/IExchange.cs index e8b9657..4f58571 100644 --- a/src/Spring.Messaging.Amqp/Core/IExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/IExchange.cs @@ -56,6 +56,6 @@ public interface IExchange /// A dictionary of arguments used to declare the exchange. These are stored by the broker, but do not necessarily have any /// meaning to the broker (depending on the exchange type). /// - IDictionary Arguments { get; } + IDictionary Arguments { get; } } } diff --git a/src/Spring.Messaging.Amqp/Core/Queue.cs b/src/Spring.Messaging.Amqp/Core/Queue.cs index 293cc4d..5c6ca02 100644 --- a/src/Spring.Messaging.Amqp/Core/Queue.cs +++ b/src/Spring.Messaging.Amqp/Core/Queue.cs @@ -51,7 +51,7 @@ public class Queue /// /// The arguments. /// - private readonly IDictionary arguments; + private readonly IDictionary arguments; /// Initializes a new instance of the class. The queue is durable, non-exclusive and non auto-delete. /// The name. @@ -75,7 +75,7 @@ public class Queue /// The exclusive. True if we are declaring an exclusive queue (the queue will only be used by the declarer's connection). /// The auto delete. True if the server should delete the queue when it is no longer in use. /// The arguments used to declare the queue. - public Queue(string name, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + public Queue(string name, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { this.name = name; this.durable = durable; @@ -108,7 +108,7 @@ public Queue(string name, bool durable, bool exclusive, bool autoDelete, IDictio /// /// Gets Arguments. /// - public IDictionary Arguments { get { return this.arguments; } } + public IDictionary Arguments { get { return this.arguments; } } /// /// A String representation of the Queue. diff --git a/src/Spring.Messaging.Amqp/Core/TopicExchange.cs b/src/Spring.Messaging.Amqp/Core/TopicExchange.cs index c3cdfdf..78e3ad2 100644 --- a/src/Spring.Messaging.Amqp/Core/TopicExchange.cs +++ b/src/Spring.Messaging.Amqp/Core/TopicExchange.cs @@ -47,7 +47,7 @@ public class TopicExchange : AbstractExchange /// The durable. /// The auto delete. /// The arguments. - public TopicExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } + public TopicExchange(string name, bool durable, bool autoDelete, IDictionary arguments) : base(name, durable, autoDelete, arguments) { } /// /// Gets Type.