Browse files

Partial sync of core namespace

  • Loading branch information...
1 parent 9e232dd commit f8a77cf5e46b12a6d058a84c817e56486b087f5a @joefitzgerald joefitzgerald committed Oct 8, 2012
View
6 Spring.Messaging.Amqp.2010-40.sln
@@ -13,8 +13,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spring.Messaging.Amqp.Rabbi
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spring.Messaging.Amqp.Rabbit.Admin.Tests.2010", "test\Spring.Messaging.Amqp.Rabbit.Admin.Tests\Spring.Messaging.Amqp.Rabbit.Admin.Tests.2010.csproj", "{CC100EAE-6668-4E39-977E-CB63D025BCCE}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spring.Erlang.Tests.2010", "test\Spring.Erlang.Tests\Spring.Erlang.Tests.2010.csproj", "{6BF21894-DDF3-496A-969F-6E3963A12E2A}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spring.Messaging.Amqp.Tests.2010", "test\Spring.Messaging.Amqp.Tests\Spring.Messaging.Amqp.Tests.2010.csproj", "{C5A985A0-45B6-499B-B8B9-A29A2BBE524A}"
EndProject
Global
@@ -47,10 +45,6 @@ Global
{CC100EAE-6668-4E39-977E-CB63D025BCCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CC100EAE-6668-4E39-977E-CB63D025BCCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CC100EAE-6668-4E39-977E-CB63D025BCCE}.Release|Any CPU.Build.0 = Release|Any CPU
- {6BF21894-DDF3-496A-969F-6E3963A12E2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {6BF21894-DDF3-496A-969F-6E3963A12E2A}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {6BF21894-DDF3-496A-969F-6E3963A12E2A}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {6BF21894-DDF3-496A-969F-6E3963A12E2A}.Release|Any CPU.Build.0 = Release|Any CPU
{C5A985A0-45B6-499B-B8B9-A29A2BBE524A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C5A985A0-45B6-499B-B8B9-A29A2BBE524A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C5A985A0-45B6-499B-B8B9-A29A2BBE524A}.Release|Any CPU.ActiveCfg = Release|Any CPU
View
3 src/Spring.Messaging.Amqp.Rabbit/Core/IChannelCallback.cs
@@ -20,14 +20,13 @@
namespace Spring.Messaging.Amqp.Rabbit.Core
{
/// <summary>A Channel Callback interface.</summary>
- /// <typeparam name="T"></typeparam>
+ /// <typeparam name="T">Type t.</typeparam>
/// <author>Mark Pollack</author>
public interface IChannelCallback<T>
{
/// <summary>Does the in rabbit.</summary>
/// <param name="model">The model.</param>
/// <returns>An object of type T.</returns>
- /// <remarks></remarks>
T DoInRabbit(IModel model);
}
}
View
32 src/Spring.Messaging.Amqp.Rabbit/Core/IConfirmCallback.cs
@@ -0,0 +1,32 @@
+// --------------------------------------------------------------------------------------------------------------------
+// <copyright file="IConfirmCallback.cs" company="The original author or authors.">
+// Copyright 2002-2012 the original author or authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+// </copyright>
+// --------------------------------------------------------------------------------------------------------------------
+
+#region Using Directives
+using Spring.Messaging.Amqp.Rabbit.Support;
+#endregion
+
+namespace Spring.Messaging.Amqp.Rabbit.Core
+{
+ /// <summary>
+ /// Confirm Callback interface
+ /// </summary>
+ public interface IConfirmCallback
+ {
+ /// <summary>The confirm.</summary>
+ /// <param name="correlationData">The correlation data.</param>
+ /// <param name="ack">The ack.</param>
+ void Confirm(CorrelationData correlationData, bool ack);
+ }
+}
View
35 src/Spring.Messaging.Amqp.Rabbit/Core/IReturnCallback.cs
@@ -0,0 +1,35 @@
+// --------------------------------------------------------------------------------------------------------------------
+// <copyright file="IReturnCallback.cs" company="The original author or authors.">
+// Copyright 2002-2012 the original author or authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations under the License.
+// </copyright>
+// --------------------------------------------------------------------------------------------------------------------
+
+#region Using Directives
+using Spring.Messaging.Amqp.Core;
+#endregion
+
+namespace Spring.Messaging.Amqp.Rabbit.Core
+{
+ /// <summary>
+ /// Return callback interface.
+ /// </summary>
+ public interface IReturnCallback
+ {
+ /// <summary>The returned message.</summary>
+ /// <param name="message">The message.</param>
+ /// <param name="replyCode">The reply code.</param>
+ /// <param name="replyText">The reply text.</param>
+ /// <param name="exchange">The exchange.</param>
+ /// <param name="routingKey">The routing key.</param>
+ void ReturnedMessage(Message message, int replyCode, string replyText, string exchange, string routingKey);
+ }
+}
View
198 src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs
@@ -16,6 +16,7 @@
#region Using Directives
using System;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.Threading;
using Common.Logging;
using RabbitMQ.Client;
@@ -58,12 +59,12 @@ namespace Spring.Messaging.Amqp.Rabbit.Core
/// <author>Mark Fisher</author>
/// <author>Dave Syer</author>
/// <author>Joe Fitzgerald (.NET)</author>
- public class RabbitTemplate : RabbitAccessor, IRabbitOperations
+ public class RabbitTemplate : RabbitAccessor, IRabbitOperations, IPublisherCallbackChannelListener
{
/// <summary>
/// The Logger.
/// </summary>
- protected static readonly ILog Logger = LogManager.GetLogger(typeof(RabbitTemplate));
+ protected new static readonly ILog Logger = LogManager.GetLogger(typeof(RabbitTemplate));
/// <summary>
/// The default exchange.
@@ -90,17 +91,17 @@ public class RabbitTemplate : RabbitAccessor, IRabbitOperations
/// <summary>
/// The exchange
/// </summary>
- private string exchange = DEFAULT_EXCHANGE;
+ private volatile string exchange = DEFAULT_EXCHANGE;
/// <summary>
/// The routing key.
/// </summary>
- private string routingKey = DEFAULT_ROUTING_KEY;
+ private volatile string routingKey = DEFAULT_ROUTING_KEY;
/// <summary>
/// The default queue name that will be used for synchronous receives.
/// </summary>
- private string queue;
+ private volatile string queue;
/// <summary>
/// The reply timeout.
@@ -110,7 +111,7 @@ public class RabbitTemplate : RabbitAccessor, IRabbitOperations
/// <summary>
/// The message converter.
/// </summary>
- private IMessageConverter messageConverter = new SimpleMessageConverter();
+ private volatile IMessageConverter messageConverter = new SimpleMessageConverter();
/// <summary>
/// The message properties converter.
@@ -120,7 +121,27 @@ public class RabbitTemplate : RabbitAccessor, IRabbitOperations
/// <summary>
/// The encoding.
/// </summary>
- private string encoding = DEFAULT_ENCODING;
+ private volatile string encoding = DEFAULT_ENCODING;
+
+ private volatile Queue replyQueue;
+
+ private readonly ConcurrentDictionary<string, ConcurrentQueue<Message>> replyHolder = new ConcurrentDictionary<string, ConcurrentQueue<Message>>();
+
+ private volatile IConfirmCallback confirmCallback;
+
+ private volatile IReturnCallback returnCallback;
+
+ private readonly ConcurrentDictionary<object, SortedDictionary<long, PendingConfirm>> pendingConfirms = new ConcurrentDictionary<object, SortedDictionary<long, PendingConfirm>>();
+
+ private volatile bool mandatory;
+
+ private volatile bool immediate;
+
+ private readonly string uuid = Guid.NewGuid().ToString();
+
+ public static readonly string STACKED_CORRELATION_HEADER = "spring_reply_correlation";
+
+ public static readonly string STACKED_REPLY_TO_HEADER = "spring_reply_to";
#endregion
#region Constructors
@@ -177,6 +198,11 @@ public RabbitTemplate(IConnectionFactory connectionFactory) : this()
public string Encoding { set { this.encoding = value; } }
/// <summary>
+ /// A queue for replies; if not provided, a temporary exclusive, auto-delete queue will be used for each reply.
+ /// </summary>
+ public Queue ReplyQueue { set { this.replyQueue = value; } }
+
+ /// <summary>
/// Sets the reply timeout. Specify the timeout in milliseconds to be used when waiting for a reply Message when using one of the
/// sendAndReceive methods. The default value is defined as {@link #DEFAULT_REPLY_TIMEOUT}. A negative value
/// indicates an indefinite timeout. Not used in the plain receive methods because there is no blocking receive
@@ -200,6 +226,55 @@ public IMessagePropertiesConverter MessagePropertiesConverter
this.messagePropertiesConverter = value;
}
}
+
+ public IConfirmCallback ConfirmCallback { set { this.confirmCallback = value; } }
+
+ public IReturnCallback ReturnCallback { set { this.returnCallback = value; } }
+
+ public bool Mandatory { set { this.mandatory = value; } }
+
+ public bool Immediate { set { this.immediate = value; } }
+
+ /// <summary>
+ /// Gets unconfirmed correlation data older than age and removes them.
+ /// </summary>
+ /// <param name="age">Age in millseconds</param>
+ /// <returns>The collection of correlation data for which confirms have not been received.</returns>
+ public HashSet<CorrelationData> GetUnconfirmed(long age)
+ {
+ var unconfirmed = new HashSet<CorrelationData>();
+ lock (this.pendingConfirms)
+ {
+ var threshold = DateTime.UtcNow.ToMilliseconds() - age;
+ foreach (var channelPendingConfirmEntry in this.pendingConfirms)
+ {
+ var channelPendingConfirms = channelPendingConfirmEntry.Value;
+
+ PendingConfirm pendingConfirm;
+ var itemsToRemove = new Dictionary<long, PendingConfirm>();
+ foreach (var item in channelPendingConfirms)
+ {
+ pendingConfirm = item.Value;
+ if (pendingConfirm.Timestamp < threshold)
+ {
+ unconfirmed.Add(pendingConfirm.CorrelationData);
+ itemsToRemove.Add(item.Key, item.Value);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ foreach (var item in itemsToRemove)
+ {
+ channelPendingConfirms.Remove(item.Key);
+ }
+ }
+ }
+
+ return unconfirmed.Count > 0 ? unconfirmed : null;
+ }
#endregion
#region Implementation of IAmqpTemplate
@@ -213,79 +288,102 @@ public IMessagePropertiesConverter MessagePropertiesConverter
/// <param name="message">The message.</param>
public void Send(string routingKey, Message message) { this.Send(this.exchange, routingKey, message); }
+ public void Send(string exchange, string routingKey, Message message)
+ {
+ this.Send(exchange, routingKey, message, null);
+ }
+
/// <summary>Send a message, given an exchange, a routing key, and the message.</summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
- public void Send(string exchange, string routingKey, Message message)
+ public void Send(string exchange, string routingKey, Message message, CorrelationData correlationData)
{
this.Execute<object>(
channel =>
{
- this.DoSend(channel, exchange, routingKey, message);
+ this.DoSend(channel, exchange, routingKey, message, correlationData);
return null;
});
}
/// <summary>Convert and send a message, given the message.</summary>
/// <param name="message">The message.</param>
- public void ConvertAndSend(object message) { this.ConvertAndSend(this.exchange, this.routingKey, message); }
+ public void ConvertAndSend(object message) { this.ConvertAndSend(this.exchange, this.routingKey, message, (CorrelationData)null); }
+
+ public void ConvertAndSend(object message, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, this.routingKey, message, correlationData); }
/// <summary>Convert and send a message, given a routing key and the message.</summary>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
- public void ConvertAndSend(string routingKey, object message) { this.ConvertAndSend(this.exchange, routingKey, message); }
+ public void ConvertAndSend(string routingKey, object message) { this.ConvertAndSend(this.exchange, routingKey, message, (CorrelationData)null); }
+
+ public void ConvertAndSend(string routingKey, object message, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, routingKey, message, correlationData); }
/// <summary>Convert and send a message, given an exchange, a routing key, and the message.</summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
- public void ConvertAndSend(string exchange, string routingKey, object message) { this.Send(exchange, routingKey, this.Execute(channel => this.GetRequiredMessageConverter().ToMessage(message, new MessageProperties()))); }
+ public void ConvertAndSend(string exchange, string routingKey, object message) { this.ConvertAndSend(exchange, routingKey, message, (CorrelationData)null); }
+ public void ConvertAndSend(string exchange, string routingKey, object message, CorrelationData correlationData) { this.Send(exchange, routingKey, this.ConvertMessageIfNecessary(message), correlationData); }
+
/// <summary>Convert and send a message, given the message.</summary>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(object message, Func<Message, Message> messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); }
+ public void ConvertAndSend(object message, Func<Message, Message> messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor, (CorrelationData)null); }
+
+ public void ConvertAndSend(object message, Func<Message, Message> messagePostProcessor, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor, correlationData); }
/// <summary>Convert and send a message, given the message.</summary>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); }
+ public void ConvertAndSend(object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor, (CorrelationData)null); }
+
+ public void ConvertAndSend(object message, IMessagePostProcessor messagePostProcessor, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, this.routingKey, message, messagePostProcessor, correlationData); }
/// <summary>Convert and send a message, given a routing key and the message.</summary>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(string routingKey, object message, Func<Message, Message> messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor); }
+ public void ConvertAndSend(string routingKey, object message, Func<Message, Message> messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor, (CorrelationData)null); }
+
+ public void ConvertAndSend(string routingKey, object message, Func<Message, Message> messagePostProcessor, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor, correlationData); }
/// <summary>Convert and send a message, given a routing key and the message.</summary>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(string routingKey, object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor); }
+ public void ConvertAndSend(string routingKey, object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor, (CorrelationData)null); }
+
+ public void ConvertAndSend(string routingKey, object message, IMessagePostProcessor messagePostProcessor, CorrelationData correlationData) { this.ConvertAndSend(this.exchange, routingKey, message, messagePostProcessor, correlationData); }
+
+ public void ConvertAndSend(string exchange, string routingKey, object message, Func<Message, Message> messagePostProcessor) { this.ConvertAndSend(exchange, routingKey, message, messagePostProcessor, (CorrelationData)null);}
/// <summary>Convert and send a message, given an exchange, a routing key, and the message.</summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(string exchange, string routingKey, object message, Func<Message, Message> messagePostProcessor)
+ public void ConvertAndSend(string exchange, string routingKey, object message, Func<Message, Message> messagePostProcessor, CorrelationData correlationData)
{
- var messageToSend = this.GetRequiredMessageConverter().ToMessage(message, new MessageProperties());
+ var messageToSend = this.ConvertMessageIfNecessary(message);
messageToSend = messagePostProcessor.Invoke(messageToSend);
- this.Send(exchange, routingKey, this.Execute(channel => messageToSend));
+ this.Send(exchange, routingKey, this.Execute(channel => messageToSend), correlationData);
}
+ public void ConvertAndSend(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor) { this.ConvertAndSend(exchange, routingKey, message, messagePostProcessor, (CorrelationData)null); }
+
/// <summary>Convert and send a message, given an exchange, a routing key, and the message.</summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
/// <param name="messagePostProcessor">The message post processor.</param>
- public void ConvertAndSend(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor)
+ public void ConvertAndSend(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor, CorrelationData correlationData)
{
- var messageToSend = this.GetRequiredMessageConverter().ToMessage(message, new MessageProperties());
+ var messageToSend = this.ConvertMessageIfNecessary(message);
messageToSend = messagePostProcessor.PostProcessMessage(messageToSend);
- this.Send(exchange, routingKey, this.Execute(channel => messageToSend));
+ this.Send(exchange, routingKey, this.Execute(channel => messageToSend), correlationData);
}
/// <summary>
@@ -422,8 +520,7 @@ public object ReceiveAndConvert(string queueName)
/// <returns>The message received.</returns>
public object ConvertSendAndReceive(string exchange, string routingKey, object message, Func<Message, Message> messagePostProcessor)
{
- var messageProperties = new MessageProperties();
- var requestMessage = this.GetRequiredMessageConverter().ToMessage(message, messageProperties);
+ var requestMessage = this.ConvertMessageIfNecessary(message);
if (messagePostProcessor != null)
{
requestMessage = messagePostProcessor.Invoke(requestMessage);
@@ -446,8 +543,7 @@ public object ConvertSendAndReceive(string exchange, string routingKey, object m
/// <returns>The message received.</returns>
public object ConvertSendAndReceive(string exchange, string routingKey, object message, IMessagePostProcessor messagePostProcessor)
{
- var messageProperties = new MessageProperties();
- var requestMessage = this.GetRequiredMessageConverter().ToMessage(message, messageProperties);
+ var requestMessage = this.ConvertMessageIfNecessary(message);
if (messagePostProcessor != null)
{
requestMessage = messagePostProcessor.PostProcessMessage(requestMessage);
@@ -462,12 +558,34 @@ public object ConvertSendAndReceive(string exchange, string routingKey, object m
return this.GetRequiredMessageConverter().FromMessage(replyMessage);
}
+ protected Message ConvertMessageIfNecessary(object message)
+ {
+ if (message is Message)
+ {
+ return (Message)message;
+ }
+
+ return this.GetRequiredMessageConverter().ToMessage(message, new MessageProperties());
+ }
+
+ protected Message DoSendAndReceive(string exchange, string routingKey, Message message)
+ {
+ if (this.replyQueue == null)
+ {
+ return this.DoSendAndReceiveWithTemporary(exchange, routingKey, message);
+ }
+ else
+ {
+ return this.DoSendAndReceiveWithFixed(exchange, routingKey, message);
+ }
+ }
+
/// <summary>Do the send and receive operation, given an exchange, a routing key and the message.</summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message to send.</param>
/// <returns>The message received.</returns>
- protected Message DoSendAndReceive(string exchange, string routingKey, Message message)
+ protected Message DoSendAndReceiveWithTemporary(string exchange, string routingKey, Message message)
{
var replyMessage = this.Execute(
delegate(IModel channel)
@@ -556,12 +674,9 @@ public T Execute<T>(ChannelCallbackDelegate<T> action)
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="message">The message.</param>
- protected void DoSend(IModel channel, string exchange, string routingKey, Message message)
+ protected void DoSend(IModel channel, string exchange, string routingKey, Message message, CorrelationData correlationData)
{
- if (Logger.IsDebugEnabled)
- {
- Logger.Debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]");
- }
+ Logger.Debug(m => m("Publishing message on exchange [{0}], routingKey = [{1}]", exchange, routingKey));
if (exchange == null)
{
@@ -575,7 +690,24 @@ protected void DoSend(IModel channel, string exchange, string routingKey, Messag
routingKey = this.routingKey;
}
- channel.BasicPublish(exchange, routingKey, false, false, this.messagePropertiesConverter.FromMessageProperties(channel, message.MessageProperties, this.encoding), message.Body);
+ if (this.confirmCallback != null && channel is IPublisherCallbackChannel)
+ {
+ var publisherCallbackChannel = (IPublisherCallbackChannel)channel;
+ publisherCallbackChannel.AddPendingConfirm(this, (long)channel.NextPublishSeqNo, new PendingConfirm(correlationData, DateTime.UtcNow.ToMilliseconds()));
+ }
+
+ var mandatory = this.returnCallback == null ? false : this.mandatory;
+ var immediate = this.returnCallback == null ? false : this.immediate;
+
+ var messageProperties = message.MessageProperties;
+ if (mandatory || immediate)
+ {
+ messageProperties.Headers.Add("spring_return_correlation", this.uuid);
+ }
+
+ var convertedMessageProperties = this.messagePropertiesConverter.FromMessageProperties(channel, message.MessageProperties, this.encoding);
+
+ channel.BasicPublish(exchange, routingKey, mandatory, immediate, convertedMessageProperties, message.Body);
// Check commit is needed.
if (this.ChannelLocallyTransacted(channel))
View
2 src/Spring.Messaging.Amqp.Rabbit/Core/Support/RabbitGatewaySupport.cs
@@ -39,7 +39,7 @@ public class RabbitGatewaySupport : IInitializingObject
/// <summary>
/// The Logger.
/// </summary>
- private readonly ILog logger = LogManager.GetLogger(typeof(RabbitGatewaySupport));
+ private static readonly ILog Logger = LogManager.GetLogger(typeof(RabbitGatewaySupport));
#endregion
/// <summary>
View
20 src/Spring.Messaging.Amqp.Rabbit/Support/IPublisherCallbackChannel.cs
@@ -21,30 +21,30 @@
namespace Spring.Messaging.Amqp.Rabbit.Support
{
/// <summary>
- /// Instances of this interface support a single listener being registered for publisher confirms with multiple channels, by adding context to the callbacks.
+ /// Instances of this interface support a single publisherCallbackChannelListener being registered for publisher confirms with multiple channels, by adding context to the callbacks.
/// </summary>
/// <author>Gary Russell</author>
/// <author>Joe Fitzgerald (.NET)</author>
public interface IPublisherCallbackChannel : IModel
{
// static string RETURN_CORRELATION = "spring_return_correlation";
- /// <summary>Adds a {@link Listener} and returns a reference to the pending confirms map for that listener's pending
+ /// <summary>Adds a {@link Listener} and returns a reference to the pending confirms map for that publisherCallbackChannelListener's pending
/// confirms, allowing the Listener to assess unconfirmed sends at any point in time.
/// The client must <b>NOT</b> modify the contents of this array, and must synchronize on it when iterating over its collections.</summary>
- /// <param name="listener">The listener.</param>
- /// <returns>A reference to pending confirms for the listener. The System.Collections.Generic.SortedList`2[TKey -&gt; System.Int64, TValue -&gt; Spring.Messaging.Amqp.Rabbit.Support.PendingConfirm].</returns>
- SortedList<long, PendingConfirm> AddListener(IListener listener);
+ /// <param name="publisherCallbackChannelListener">The publisherCallbackChannelListener.</param>
+ /// <returns>A reference to pending confirms for the publisherCallbackChannelListener. The System.Collections.Generic.SortedList`2[TKey -&gt; System.Int64, TValue -&gt; Spring.Messaging.Amqp.Rabbit.Support.PendingConfirm].</returns>
+ SortedList<long, PendingConfirm> AddListener(IPublisherCallbackChannelListener publisherCallbackChannelListener);
- /// <summary>The remove listener.</summary>
- /// <param name="listener">Removes the listener.</param>
+ /// <summary>The remove publisherCallbackChannelListener.</summary>
+ /// <param name="publisherCallbackChannelListener">Removes the publisherCallbackChannelListener.</param>
/// <returns>The System.Boolean.</returns>
- bool RemoveListener(IListener listener);
+ bool RemoveListener(IPublisherCallbackChannelListener publisherCallbackChannelListener);
/// <summary>Adds a pending confirmation to this channel's map.</summary>
- /// <param name="listener">The listener.</param>
+ /// <param name="publisherCallbackChannelListener">The publisherCallbackChannelListener.</param>
/// <param name="seq">The key to the map.</param>
/// <param name="pendingConfirm">The PendingConfirm object.</param>
- void AddPendingConfirm(IListener listener, long seq, PendingConfirm pendingConfirm);
+ void AddPendingConfirm(IPublisherCallbackChannelListener publisherCallbackChannelListener, long seq, PendingConfirm pendingConfirm);
}
}
View
4 ...essaging.Amqp.Rabbit/Support/IListener.cs → ...port/IPublisherCallbackChannelListener.cs
@@ -1,5 +1,5 @@
// --------------------------------------------------------------------------------------------------------------------
-// <copyright file="IListener.cs" company="The original author or authors.">
+// <copyright file="IPublisherCallbackChannelListener.cs" company="The original author or authors.">
// Copyright 2002-2012 the original author or authors.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
@@ -29,7 +29,7 @@ namespace Spring.Messaging.Amqp.Rabbit.Support
/// </summary>
/// <author>Gary Russell</author>
/// <author>Joe Fitzgerald (.NET)</author>
- public interface IListener
+ public interface IPublisherCallbackChannelListener
{
/// <summary>Invoked by the channel when a confirm is received.</summary>
/// <param name="pendingConfirm">The pending confirmation, containing correlation data.</param>

0 comments on commit f8a77cf

Please sign in to comment.