Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 2 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Commits on Jul 06, 2012
Oliver, Jonathan #64 Added tests and implementation to prevent an empty transaction fr…
…om committing against the underlying channel.
b16a509
Oliver, Jonathan Removed comments for #60 and #61; re-added logging for #61 and commen…
…ted out spike implementation for #61.
d7666b0
View
89 src/proj/NanoMessageBus.RabbitChannel/RabbitChannel.cs
@@ -3,6 +3,7 @@
using System;
using System.IO;
using System.Linq;
+ using System.Threading;
using Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
@@ -24,7 +25,7 @@ public virtual void Receive(Action<IDeliveryContext> callback)
if (callback == null)
throw new ArgumentNullException("callback");
- Log.Debug("Attempting to begin receiving messages.");
+ Log.Debug("Attempting to begin receiving messages on channel {0}.", this.identifier);
this.ThrowWhenDisposed();
this.ThrowWhenDispatchOnly();
@@ -45,7 +46,7 @@ protected virtual bool Receive(BasicDeliverEventArgs message, Action<IDeliveryCo
if (this.shutdown)
{
- Log.Debug("Shutdown request has been made; finished receiving.");
+ Log.Debug("Shutdown request has been made on channel {0}; finished receiving.", this.identifier);
return FinishedReceiving;
}
@@ -66,27 +67,28 @@ protected virtual void TryReceive(BasicDeliverEventArgs message, Action<IDeliver
try
{
- Log.Verbose("Translating wire-specific message into channel message.");
+ Log.Verbose("Translating wire-specific message into channel message for channel {0}.", this.identifier);
this.CurrentMessage = this.adapter.Build(message);
- Log.Info("Routing message '{0}' received through group '{1}' to configured receiver callback.", messageId, this.configuration.GroupName);
+ Log.Info("Routing message '{0}' received through group '{1}' to configured receiver callback on channel {2}.",
+ messageId, this.configuration.GroupName, this.identifier);
callback(this);
}
catch (ChannelConnectionException)
{
- Log.Warn("The channel has become unavailable, aborting current transaction.");
+ Log.Warn("Channel {0} has become unavailable, aborting current transaction.", this.identifier);
this.CurrentTransaction.TryDispose();
throw;
}
catch (PoisonMessageException e)
{
- Log.Warn("Wire message {0} could not be deserialized; forwarding to poison message exchange.", messageId);
+ Log.Warn("Wire message {0} on channel {1} could not be deserialized; forwarding to poison message exchange.", messageId, this.identifier);
this.ForwardToPoisonMessageExchange(message, e);
}
catch (DeadLetterException e)
{
var seconds = (SystemTime.UtcNow - e.Expiration).TotalSeconds;
- Log.Info("Wire message {0} expired on the wire {1:n3} seconds ago; forwarding to dead-letter exchange.", messageId, seconds);
+ Log.Info("Wire message {0} on channel {1} expired on the wire {2:n3} seconds ago; forwarding to dead-letter exchange.", messageId, this.identifier, seconds);
this.ForwardTo(message, this.configuration.DeadLetterExchange);
}
catch (Exception e)
@@ -97,22 +99,22 @@ protected virtual void TryReceive(BasicDeliverEventArgs message, Action<IDeliver
protected virtual void RetryMessage(BasicDeliverEventArgs message, Exception exception)
{
var nextAttempt = this.AppendException(message, exception) + 1;
- Log.Debug("Message '{0}' has been attempted {1} times.", message.MessageId(), nextAttempt);
+ Log.Debug("Message '{0}' has been attempted {1} times on channel {2}.", message.MessageId(), nextAttempt, this.identifier);
if (nextAttempt > this.configuration.MaxAttempts)
{
- Log.Error("Unable to process message '{0}'".FormatWith(message.MessageId()), exception);
+ Log.Error("Unable to process message '{0}' on channel {1}".FormatWith(message.MessageId(), this.identifier), exception);
this.ForwardToPoisonMessageExchange(message, null);
}
else
{
- Log.Info("Unhandled exception for message '{0}'; retrying.".FormatWith(message.MessageId()), exception);
+ Log.Info("Unhandled exception for message '{0}' on channel {1}; retrying.".FormatWith(message.MessageId(), this.identifier), exception);
this.ForwardTo(message, this.configuration.InputQueue.ToPublicationAddress());
}
}
protected virtual void ForwardToPoisonMessageExchange(BasicDeliverEventArgs message, Exception exception)
{
- Log.Info("Message '{0}' is a poison message.", message.MessageId());
+ Log.Info("Message '{0}' on channel {1} is a poison message.", message.MessageId(), this.identifier);
this.AppendException(message, exception);
message.SetAttemptCount(0);
@@ -135,7 +137,7 @@ protected virtual int AppendException(BasicDeliverEventArgs message, Exception e
protected virtual void ForwardTo(BasicDeliverEventArgs message, PublicationAddress address)
{
- Log.Debug("Forwarding message '{0}' to recipient '{1}'.", message.MessageId(), address);
+ Log.Debug("Forwarding message '{0}' on channel {1} to recipient '{2}'.", message.MessageId(), this.identifier, address);
this.EnsureTransaction();
this.Send(message, address);
@@ -168,7 +170,8 @@ private void EnlistSend(ChannelEnvelope envelope)
? this.delivery
: this.adapter.Build(envelope.Message, this.channel.CreateBasicProperties());
- Log.Verbose("Sending wire message '{0}' to {1} recipients.", message.MessageId(), envelope.Recipients.Count);
+ Log.Verbose("Sending wire message '{0}' on channel {1} to {2} recipients.",
+ message.MessageId(), this.identifier, envelope.Recipients.Count);
foreach (var recipient in envelope.Recipients.Select(x => x.ToPublicationAddress(this.configuration)))
{
this.ThrowWhenDisposed();
@@ -185,7 +188,8 @@ protected virtual void Send(BasicDeliverEventArgs message, PublicationAddress re
this.EnsureTransaction().Register(() => this.Try(() =>
{
- Log.Info("Dispatching wire message '{0}' to messaging infrastructure for recipient '{1}'.", message.MessageId(), recipient);
+ Log.Info("Dispatching wire message '{0}' on channel {1} to messaging infrastructure for recipient '{2}'.",
+ message.MessageId(), this.identifier, recipient);
this.channel.BasicPublish(recipient, message.BasicProperties, message.Body);
}));
}
@@ -197,7 +201,7 @@ public virtual void AcknowledgeMessage()
if (this.subscription == null || this.transactionType == RabbitTransactionType.None)
return;
- Log.Verbose("Acknowledging all previous message deliveries from the messaging infrastructure.");
+ Log.Verbose("Acknowledging all previous message deliveries from the messaging infrastructure on channel {0}.", this.identifier);
this.Try(this.subscription.AcknowledgeMessages);
}
public virtual void CommitTransaction()
@@ -206,7 +210,7 @@ public virtual void CommitTransaction()
if (this.transactionType == RabbitTransactionType.Full)
{
- Log.Verbose("Committing transaction against the messaging infrastructure.");
+ Log.Verbose("Committing transaction against the messaging infrastructure on channel {0}.", this.identifier);
this.Try(this.channel.TxCommit);
}
@@ -218,7 +222,7 @@ public virtual void RollbackTransaction()
if (this.transactionType == RabbitTransactionType.Full)
{
- Log.Verbose("Rolling back transaction against the messaging infrastructure.");
+ Log.Verbose("Rolling back transaction against the messaging infrastructure on channel {0}.", this.identifier);
this.Try(this.channel.TxRollback);
}
@@ -227,7 +231,7 @@ public virtual void RollbackTransaction()
public virtual void BeginShutdown()
{
- Log.Debug("Beginning shutdown sequence.");
+ Log.Debug("Beginning shutdown sequence on channel {0}.", this.identifier);
this.shutdown = true;
}
@@ -243,8 +247,8 @@ protected virtual void ThrowWhenDispatchOnly()
{
if (!this.configuration.DispatchOnly)
return;
-
- Log.Warn("Dispatch-only channels cannot receive messages.");
+
+ Log.Warn("Channel {0} is dispatch only and cannot receive messages.", this.identifier);
throw new InvalidOperationException("Dispatch-only channels cannot receive messages.");
}
protected virtual void ThrowWhenShuttingDown()
@@ -252,7 +256,7 @@ protected virtual void ThrowWhenShuttingDown()
if (!this.shutdown)
return;
- Log.Warn("The channel is shutting down.");
+ Log.Warn("Channel {0} is shutting down.", this.identifier);
throw new ChannelShutdownException();
}
protected virtual void ThrowWhenDisposed()
@@ -260,7 +264,7 @@ protected virtual void ThrowWhenDisposed()
if (!this.disposed)
return;
- Log.Warn("The channel has been disposed.");
+ Log.Warn("Channel {0} has previously been disposed.", this.identifier);
throw new ObjectDisposedException(typeof(RabbitChannel).Name);
}
protected virtual void ThrowWhenSubscriptionExists()
@@ -268,7 +272,7 @@ protected virtual void ThrowWhenSubscriptionExists()
if (this.subscription == null)
return;
- Log.Warn("A receive callback has already been specified.");
+ Log.Warn("A receive callback has already been specified on channel {0}.", this.identifier);
throw new InvalidOperationException("The channel already has a receive callback.");
}
@@ -277,7 +281,7 @@ protected virtual IChannelTransaction EnsureTransaction()
if (!this.CurrentTransaction.Finished)
return this.CurrentTransaction;
- Log.Verbose("The current transaction has been completed, creating a new transaction.");
+ Log.Verbose("The current transaction has been completed, creating a new transaction on channel {0}.", this.identifier);
this.CurrentTransaction.TryDispose();
return this.CurrentTransaction = new RabbitTransaction(this, this.transactionType);
@@ -286,23 +290,25 @@ protected virtual void Try(Action callback)
{
try
{
+ //if (this.tryOperation)
callback();
}
catch (IOException e)
{
- Log.Info("Channel operation failed, aborting channel.");
-
- this.Dispose();
- throw new ChannelConnectionException(e.Message, e);
+ this.CatchOperation("Channel operation failed, aborting channel {0}.", e);
}
catch (OperationInterruptedException e)
{
- Log.Info("Channel operation interrupted, aborting channel.");
-
- this.Dispose();
- throw new ChannelConnectionException(e.Message, e);
+ this.CatchOperation("Channel operation interrupted, aborting channel {0}.", e);
}
}
+ private void CatchOperation(string message, Exception e)
+ {
+ Log.Info(message, this.identifier);
+ //this.tryOperation = false;
+ this.Dispose();
+ throw new ChannelConnectionException(e.Message, e);
+ }
public RabbitChannel(
IModel channel,
@@ -317,19 +323,20 @@ protected virtual void Try(Action callback)
this.transactionType = configuration.TransactionType;
this.subscriptionFactory = subscriptionFactory;
this.CurrentResolver = configuration.DependencyResolver;
+ this.identifier = Interlocked.Increment(ref counter);
this.CurrentTransaction = new RabbitTransaction(this, this.transactionType);
if (this.transactionType == RabbitTransactionType.Full)
{
- Log.Debug("Marking channel as transactional.");
+ Log.Debug("Marking channel {0} as transactional.", this.identifier);
this.channel.TxSelect();
}
if (this.configuration.ChannelBuffer <= 0 || this.configuration.DispatchOnly)
return;
- Log.Debug("Buffering up to {0} message(s) on the channel.",
- this.transactionType == RabbitTransactionType.None ? long.MaxValue : this.configuration.ChannelBuffer);
+ var buffer = this.transactionType == RabbitTransactionType.None ? long.MaxValue : this.configuration.ChannelBuffer;
+ Log.Debug("Buffering up to {0} message(s) on the channel {1}.", buffer, this.identifier);
if (this.configuration.TransactionType == RabbitTransactionType.None)
return;
@@ -354,12 +361,7 @@ protected virtual void Dispose(bool disposing)
if (this.disposed)
return;
- // TODO: disposing the current transaction will attempt a rollback against the underlying (and unavailable) channel
- // which then invokes dispose here (again)
-
- // TODO: we may also be able to reproduce issue #61 locally without too much trouble
-
- Log.Debug("Disposing channel.");
+ Log.Debug("Disposing channel {0}.", this.identifier);
this.CurrentTransaction.TryDispose(); // must happen here because it checks for dispose
this.disposed = true;
@@ -370,20 +372,23 @@ protected virtual void Dispose(bool disposing)
// dispose can throw while abort does the exact same thing without throwing
this.channel.Abort();
- Log.Debug("Channel disposed.");
+ Log.Debug("Channel {0} disposed.", this.identifier);
}
private const bool ContinueReceiving = true;
private const bool FinishedReceiving = false; // returning false means the receiving handler will exit.
private static readonly ILog Log = LogFactory.Build(typeof(RabbitChannel));
+ private static int counter;
private readonly IModel channel;
private readonly IChannelConnector connector;
private readonly RabbitMessageAdapter adapter;
private readonly RabbitChannelGroupConfiguration configuration;
private readonly RabbitTransactionType transactionType;
private readonly Func<RabbitSubscription> subscriptionFactory;
+ private readonly int identifier;
private RabbitSubscription subscription;
private BasicDeliverEventArgs delivery;
+ //private bool tryOperation = true;
private bool disposed;
private volatile bool shutdown;
}
View
8 src/proj/NanoMessageBus.RabbitChannel/RabbitTransaction.cs
@@ -13,6 +13,8 @@ public virtual bool Finished
public virtual void Register(Action callback)
{
+ this.active = true;
+
if (callback == null)
throw new ArgumentNullException("callback");
@@ -28,6 +30,9 @@ public virtual void Register(Action callback)
public virtual void Commit()
{
+ if (!this.active)
+ return;
+
this.ThrowWhenDisposed();
this.ThrowWhenRolledBack();
@@ -90,7 +95,7 @@ public virtual void Rollback()
}
protected virtual void RollbackChannel()
{
- if (this.transactionType == RabbitTransactionType.Full)
+ if (this.active && this.transactionType == RabbitTransactionType.Full)
this.channel.RollbackTransaction();
}
@@ -161,6 +166,7 @@ protected virtual void Dispose(bool disposing)
private readonly ICollection<Action> callbacks = new LinkedList<Action>();
private readonly RabbitChannel channel;
private readonly RabbitTransactionType transactionType;
+ private bool active;
private bool disposed;
private bool committing;
private bool committed;
View
2 src/proj/NanoMessageBus/TaskWorkerGroup.cs
@@ -130,7 +130,7 @@ protected virtual void WatchQueue(IWorkItem<T> worker, CancellationToken token)
try
{
foreach (var item in this.workItems.GetConsumingEnumerable(token))
- item(worker); // TODO: #60 if this operation isn't completely successful, the item should be re-enqueued
+ item(worker);
}
catch (OperationCanceledException)
{
View
6 src/tests/NanoMessageBus.RabbitChannel.UnitTests/RabbitChannelTests.cs
@@ -219,6 +219,7 @@ public class when_receiving_a_message_after_the_previous_delivery_was_committed
mockAdapter.Setup(x => x.Build(message)).Returns(new Mock<ChannelMessage>().Object);
channel.Receive(deliveryContext => delivery = deliveryContext);
Receive(message);
+ channel.CurrentTransaction.Register(EmptyCallback);
(committed = delivery.CurrentTransaction).Commit();
};
@@ -1047,6 +1048,8 @@ public class when_disposing_a_transactional_channel : using_a_channel
RequireTransaction(RabbitTransactionType.Full);
Initialize();
+
+ channel.CurrentTransaction.Register(EmptyCallback);
};
Because of = () =>
@@ -1303,6 +1306,9 @@ protected static void Try(Action callback)
{
thrown = Catch.Exception(callback);
}
+ protected static void EmptyCallback()
+ {
+ }
protected const string DefaultChannelGroup = "some group name";
protected const string InputQueueName = "input-queue";
View
101 src/tests/NanoMessageBus.RabbitChannel.UnitTests/RabbitTransactionTests.cs
@@ -48,6 +48,25 @@ public class when_an_action_is_registered_with_a_full_transaction : using_a_tran
It should_not_invoke_the_callback = () =>
invocations.ShouldEqual(0);
+ }
+
+ [Subject(typeof(RabbitTransaction))]
+ public class when_committing_a_transaction_where_no_actions_have_been_registered : using_a_transaction
+ {
+ Establish context = () =>
+ {
+ transactionType = RabbitTransactionType.Full;
+ Initialize();
+ };
+
+ Because of = () =>
+ transaction.Commit();
+
+ It should_NOT_acknowledge_message_delivery_to_the_underlying_channel = () =>
+ mockChannel.Verify(x => x.AcknowledgeMessage(), Times.Never());
+
+ It should_NOT_commit_against_the_underlying_channel = () =>
+ mockChannel.Verify(x => x.CommitTransaction(), Times.Never());
}
[Subject(typeof(RabbitTransaction))]
@@ -111,6 +130,7 @@ public class when_all_callbacks_have_completed_and_the_channel_is_notified_to_co
transactionType = RabbitTransactionType.Full;
Initialize();
+ transaction.Register(callback);
mockChannel.Setup(x => x.CommitTransaction()).Callback(() => finished = transaction.Finished);
};
@@ -125,9 +145,12 @@ public class when_all_callbacks_have_completed_and_the_channel_is_notified_to_co
[Subject(typeof(RabbitTransaction))]
public class when_an_action_is_registered_with_the_committed_transaction : using_a_transaction
- {
- Establish context = () =>
- transaction.Commit();
+ {
+ Establish context = () =>
+ {
+ transaction.Register(callback);
+ transaction.Commit();
+ };
Because of = () =>
thrown = Catch.Exception(() => transaction.Register(callback));
@@ -186,7 +209,9 @@ public class when_committing_an_acknowledge_only_transaction : using_a_transacti
transactionType = RabbitTransactionType.Acknowledge;
mockChannel.Setup(x => x.CommitTransaction());
mockChannel.Setup(x => x.AcknowledgeMessage());
- Initialize();
+ Initialize();
+
+ transaction.Register(callback);
};
Because of = () =>
@@ -206,8 +231,11 @@ public class when_committing_full_transaction : using_a_transaction
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.CommitTransaction());
- mockChannel.Setup(x => x.AcknowledgeMessage());
+ mockChannel.Setup(x => x.AcknowledgeMessage());
+
Initialize();
+
+ transaction.Register(callback);
};
Because of = () =>
@@ -222,9 +250,12 @@ public class when_committing_full_transaction : using_a_transaction
[Subject(typeof(RabbitTransaction))]
public class when_commiting_a_disposed_transaction : using_a_transaction
- {
- Establish context = () =>
- transaction.Dispose();
+ {
+ Establish context = () =>
+ {
+ transaction.Register(callback);
+ transaction.Dispose();
+ };
Because of = () =>
thrown = Catch.Exception(() => transaction.Commit());
@@ -235,9 +266,12 @@ public class when_commiting_a_disposed_transaction : using_a_transaction
[Subject(typeof(RabbitTransaction))]
public class when_attempting_to_commit_a_rolled_back_transaction : using_a_transaction
- {
- Establish context = () =>
- transaction.Rollback();
+ {
+ Establish context = () =>
+ {
+ transaction.Register(callback);
+ transaction.Rollback();
+ };
Because of = () =>
thrown = Catch.Exception(() => transaction.Commit());
@@ -248,9 +282,15 @@ public class when_attempting_to_commit_a_rolled_back_transaction : using_a_trans
[Subject(typeof(RabbitTransaction))]
public class when_rolling_back_a_transaction : using_a_transaction
- {
- Because of = () =>
- transaction.Rollback();
+ {
+ Because of = () =>
+ {
+ transactionType = RabbitTransactionType.Acknowledge;
+
+ Initialize();
+ transaction.Register(callback);
+ transaction.Rollback();
+ };
It should_mark_the_transaction_as_finished = () =>
transaction.Finished.ShouldBeTrue();
@@ -266,7 +306,9 @@ public class when_rolling_back_a_full_transaction : using_a_transaction
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.RollbackTransaction());
- Initialize();
+ Initialize();
+
+ transaction.Register(callback);
};
Because of = () =>
@@ -283,8 +325,9 @@ public class when_rolling_back_a_transaction_more_than_once : using_a_transactio
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.RollbackTransaction());
- Initialize();
-
+ Initialize();
+
+ transaction.Register(callback);
transaction.Rollback();
};
@@ -297,9 +340,12 @@ public class when_rolling_back_a_transaction_more_than_once : using_a_transactio
[Subject(typeof(RabbitTransaction))]
public class when_rolling_back_a_committed_transaction : using_a_transaction
- {
- Establish context = () =>
- transaction.Commit();
+ {
+ Establish context = () =>
+ {
+ transaction.Register(callback);
+ transaction.Commit();
+ };
Because of = () =>
thrown = Catch.Exception(() => transaction.Rollback());
@@ -341,7 +387,9 @@ public class when_disposing_a_full_transaction : using_a_transaction
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.RollbackTransaction());
- Initialize();
+ Initialize();
+
+ transaction.Register(callback);
};
Because of = () =>
@@ -358,8 +406,9 @@ public class when_disposing_a_committed_transaction : using_a_transaction
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.RollbackTransaction());
- Initialize();
-
+ Initialize();
+
+ transaction.Register(callback);
transaction.Commit();
};
@@ -377,8 +426,9 @@ public class when_disposing_a_rolled_back_transaction : using_a_transaction
{
transactionType = RabbitTransactionType.Full;
mockChannel.Setup(x => x.RollbackTransaction());
- Initialize();
-
+ Initialize();
+
+ transaction.Register(callback);
transaction.Rollback();
};
@@ -398,6 +448,7 @@ public class when_disposing_a_transaction_more_than_once : using_a_transaction
mockChannel.Setup(x => x.RollbackTransaction());
Initialize();
+ transaction.Register(callback);
transaction.Dispose();
};

No commit comments for this range

Something went wrong with that request. Please try again.