Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Sync of core namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Fitzgerald committed Oct 10, 2012
1 parent f8a77cf commit fb3e9f9
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 58 deletions.
343 changes: 301 additions & 42 deletions src/Spring.Messaging.Amqp.Rabbit/Core/RabbitTemplate.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace Spring.Messaging.Amqp.Rabbit.Listener
/// Specialized consumer encapsulating knowledge of the broker connections and having its own lifecycle (start and stop).
/// </summary>
/// <author>Mark Pollack</author>
public class BlockingQueueConsumer : DefaultBasicConsumer
public class BlockingQueueConsumer
{
#region Private Fields

Expand Down Expand Up @@ -337,9 +337,10 @@ public virtual void RollbackOnExceptionIfNecessary(IModel channel, Message messa
foreach (var deliveryTag in this.deliveryTags)
{
// With newer RabbitMQ brokers could use basicNack here...
channel.BasicReject((ulong)message.MessageProperties.DeliveryTag, true);
// channel.BasicReject((ulong)message.MessageProperties.DeliveryTag, true);

// channel.BasicNack((ulong)message.MessageProperties.DeliveryTag, true, true);
// ... So we will.
channel.BasicNack((ulong)message.MessageProperties.DeliveryTag, true, true);
}

if (this.transactional)
Expand Down Expand Up @@ -422,7 +423,7 @@ internal class InternalConsumer : DefaultBasicConsumer
/// <summary>
/// The Logger.
/// </summary>
private readonly ILog logger = LogManager.GetLogger(typeof(InternalConsumer));
private static readonly ILog Logger = LogManager.GetLogger(typeof(InternalConsumer));

/// <summary>
/// The outer blocking queue consumer.
Expand All @@ -439,7 +440,7 @@ internal class InternalConsumer : DefaultBasicConsumer
/// <param name="reason">The reason.</param>
public override void HandleModelShutdown(IModel channel, ShutdownEventArgs reason)
{
this.logger.Warn(m => m("Received shutdown signal for consumer tag {0}, cause: {1}", this.ConsumerTag, reason.Cause));
Logger.Warn(m => m("Received shutdown signal for consumer tag {0}, cause: {1}", this.ConsumerTag, reason.Cause));
base.HandleModelShutdown(channel, reason);

this.outer.shutdown = reason;
Expand All @@ -450,7 +451,7 @@ public override void HandleModelShutdown(IModel channel, ShutdownEventArgs reaso
/// <param name="consumerTag">The consumer tag.</param>
public override void HandleBasicCancel(string consumerTag)
{
this.logger.Warn(m => m("Cancel received"));
Logger.Warn(m => m("Cancel received"));
base.HandleBasicCancel(consumerTag);
this.outer.cancelReceived.LazySet(true);
}
Expand All @@ -459,7 +460,7 @@ public override void HandleBasicCancel(string consumerTag)
/// <param name="consumerTag">The consumer tag.</param>
public override void HandleBasicCancelOk(string consumerTag)
{
this.logger.Debug(m => m("Received cancellation notice for {0}", this.outer.ToString()));
Logger.Debug(m => m("Received cancellation notice for {0}", this.outer.ToString()));
base.HandleBasicCancelOk(consumerTag);

// Signal to the container that we have been cancelled
Expand All @@ -481,7 +482,7 @@ public void HandleBasicDeliver(string consumerTag, BasicGetResult envelope, IBas
}
}

this.logger.Debug(m => m("Storing delivery for {0}", this.outer.ToString()));
Logger.Debug(m => m("Storing delivery for {0}", this.outer.ToString()));

try
{
Expand All @@ -505,7 +506,6 @@ public void HandleBasicDeliver(string consumerTag, BasicGetResult envelope, IBas
/// <param name="body">The body.</param>
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
{
// TODO: Validate that 1 is the right message count.
var envelope = new BasicGetResult(deliveryTag, redelivered, exchange, routingKey, 1, properties, body);
this.HandleBasicDeliver(consumerTag, envelope, properties, body);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,6 @@ public AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer, SimpleMess
/// <returns>
/// A startup exception if there was one.
/// </returns>
/// <exception cref="System.TimeoutException">
/// </exception>
public FatalListenerStartupException GetStartupException()
{
if (!this.start.Wait(new TimeSpan(0, 0, 0, 0, 60000)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@
<Compile Include="Core\ChannelCallbackDelegate.cs" />
<Compile Include="Core\IChannelAwareMessageListener.cs" />
<Compile Include="Core\IChannelCallback.cs" />
<Compile Include="Core\IConfirmCallback.cs" />
<Compile Include="Core\IRabbitOperations.cs" />
<Compile Include="Connection\RabbitAccessor.cs" />
<Compile Include="Core\IReturnCallback.cs" />
<Compile Include="Core\RabbitAdmin.cs" />
<Compile Include="Core\RabbitTemplate.cs" />
<Compile Include="Core\Support\RabbitGatewaySupport.cs" />
Expand All @@ -151,7 +153,7 @@
<Compile Include="Support\CorrelationData.cs" />
<Compile Include="Support\DateExtensions.cs" />
<Compile Include="Support\DefaultMessagePropertiesConverter.cs" />
<Compile Include="Support\IListener.cs" />
<Compile Include="Support\IPublisherCallbackChannelListener.cs" />
<Compile Include="Support\IMessagePropertiesConverter.cs" />
<Compile Include="Support\IPublisherCallbackChannel.cs" />
<Compile Include="Support\PendingConfirm.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
#region Using Directives
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
#endregion

namespace Spring.Messaging.Amqp.Rabbit.Support
Expand Down Expand Up @@ -91,6 +93,43 @@ public static bool Poll<T>(this ConcurrentQueue<T> queue, TimeSpan duration, out
return true;
}

public static T Poll<T>(this BlockingCollection<T> collection, TimeSpan duration)
{
T result = default(T);
var tokenSource = new CancellationTokenSource();
var task = new Task(
() =>
{
try
{
result = collection.Take(tokenSource.Token);
}
catch (OperationCanceledException)
{
result = default(T);
}
}, tokenSource.Token);
task.Start();
task.Wait(duration);
return result;
}

public static T Poll<T>(this BlockingCollection<T> collection, int timeoutMilliseconds) { return collection.Poll(new TimeSpan(0, 0, 0, 0, timeoutMilliseconds)); }

public static TValue Get<TKey, TValue>(this IDictionary<TKey, TValue> collection, TKey key)
{
TValue result = default(TValue);
try
{
var success = collection.TryGetValue(key, out result);
return result;
}
catch (Exception ex)
{
return default(TValue);
}
}

internal static TimeSpan Cap(TimeSpan waitTime) { return waitTime > MaxValue ? MaxValue : waitTime; }

/// <summary>Lock the stack trace information of the given <paramref name="exception"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface IPublisherCallbackChannel : IModel
/// The client must <b>NOT</b> modify the contents of this array, and must synchronize on it when iterating over its collections.</summary>
/// <param name="publisherCallbackChannelListener">The publisherCallbackChannelListener.</param>
/// <returns>A reference to pending confirms for the publisherCallbackChannelListener. The System.Collections.Generic.SortedList`2[TKey -&gt; System.Int64, TValue -&gt; Spring.Messaging.Amqp.Rabbit.Support.PendingConfirm].</returns>
SortedList<long, PendingConfirm> AddListener(IPublisherCallbackChannelListener publisherCallbackChannelListener);
SortedDictionary<long, PendingConfirm> AddListener(IPublisherCallbackChannelListener publisherCallbackChannelListener);

/// <summary>The remove publisherCallbackChannelListener.</summary>
/// <param name="publisherCallbackChannelListener">Removes the publisherCallbackChannelListener.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ void HandleReturn(
/// <summary>When called, this listener must remove all references to the pending confirm map.</summary>
/// <param name="channel">The channel.</param>
/// <param name="unconfirmed">The pending confirm map.</param>
void RemovePendingConfirmsReference(IModel channel, SortedList<long, PendingConfirm> unconfirmed);
void RemovePendingConfirmsReference(IModel channel, SortedDictionary<long, PendingConfirm> unconfirmed);

/// <summary>Returns the UUID used to identify this Listener for returns.</summary>
string GetUuid { get; }
string Uuid { get; }

/// <summary>Gets a value indicating whether is confirm listener.</summary>
bool IsConfirmListener { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void TestSendAndReceiveWithTopicConsumeInBackground()

try
{
consumer.Model.BasicCancel(consumer.ConsumerTag);
consumer.Channel.BasicCancel(consumer.ConsumerTag);
}
catch (Exception e)
{
Expand Down

0 comments on commit fb3e9f9

Please sign in to comment.