Skip to content

Commit

Permalink
Fixed issue with MSMQ transport not reloading properly when MSMQ rest…
Browse files Browse the repository at this point in the history
…arted, fixed SubscriptionClient issue on remote machines
  • Loading branch information
phatboyg committed Sep 12, 2012
1 parent f5db0da commit f69b613
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 284 deletions.
2 changes: 1 addition & 1 deletion .semver
@@ -1,5 +1,5 @@
---
:patch: 2
:patch: 3
:special: ""
:major: 2
:minor: 6
203 changes: 111 additions & 92 deletions src/MassTransit/Services/Subscriptions/Client/SubscriptionClient.cs
Expand Up @@ -12,30 +12,30 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Services.Subscriptions.Client
{
using System;
using System.Threading;
using Exceptions;
using Logging;
using MassTransit.Subscriptions.Coordinator;
using MassTransit.Subscriptions.Messages;
using Messages;
using System;
using System.Threading;
using Exceptions;
using Logging;
using MassTransit.Subscriptions.Coordinator;
using MassTransit.Subscriptions.Messages;
using Messages;

/// <summary>
/// The subscription client is responsible for exchanging subscription information between a local bus and the central subscription coordinator.
/// </summary>
public class SubscriptionClient :
SubscriptionObserver
{
static readonly ILog _log = Logger.Get(typeof (SubscriptionClient));
readonly IServiceBus _bus;
readonly SubscriptionRouter _router;
readonly string _network;
readonly SubscriptionServiceMessageProducer _producer;
readonly ManualResetEvent _ready = new ManualResetEvent(false);
readonly TimeSpan _startTimeout;
readonly Uri _subscriptionServiceUri;
UnsubscribeAction _unsubscribeAction;
readonly IEndpoint _subscriptionEndpoint;
SubscriptionObserver
{
static readonly ILog _log = Logger.Get(typeof (SubscriptionClient));
readonly IServiceBus _bus;
readonly SubscriptionRouter _router;
readonly string _network;
readonly SubscriptionServiceMessageProducer _producer;
readonly ManualResetEvent _ready = new ManualResetEvent(false);
readonly TimeSpan _startTimeout;
readonly Uri _subscriptionServiceUri;
UnsubscribeAction _unsubscribeAction;
readonly IEndpoint _subscriptionEndpoint;

/// <summary>
/// Initializes a new instance of the <see cref="SubscriptionClient"/> class.
Expand All @@ -44,98 +44,117 @@ public class SubscriptionClient :
/// <param name="router">The router.</param>
/// <param name="subscriptionServiceUri">The subscription service URI.</param>
/// <param name="startTimeout">The start timeout.</param>
public SubscriptionClient(IServiceBus bus, SubscriptionRouter router, Uri subscriptionServiceUri, TimeSpan startTimeout)
{
_bus = bus;
_router = router;
_subscriptionServiceUri = subscriptionServiceUri;
_startTimeout = startTimeout;
_network = router.Network;
public SubscriptionClient(IServiceBus bus, SubscriptionRouter router, Uri subscriptionServiceUri, TimeSpan startTimeout)
{
_bus = bus;
_router = router;
_subscriptionServiceUri = subscriptionServiceUri;
_startTimeout = startTimeout;
_network = router.Network;

if (_log.IsDebugEnabled)
_log.DebugFormat("Starting SubscriptionClient using {0}", subscriptionServiceUri);
if (_log.IsDebugEnabled)
_log.DebugFormat("Starting SubscriptionClient using {0}", subscriptionServiceUri);

VerifyClientAndServiceNotOnSameEndpoint(bus);
VerifyClientAndServiceNotOnSameEndpoint(bus);

_ready.Reset();
_ready.Reset();

var consumerInstance = new SubscriptionMessageConsumer(_router, _network);
var consumerInstance = new SubscriptionMessageConsumer(_router, _network);

_unsubscribeAction = _bus.ControlBus.SubscribeInstance(consumerInstance);
_unsubscribeAction += _bus.ControlBus.SubscribeContextHandler<SubscriptionRefresh>(Consume);
_unsubscribeAction = _bus.ControlBus.SubscribeInstance(consumerInstance);
_unsubscribeAction += _bus.ControlBus.SubscribeContextHandler<SubscriptionRefresh>(Consume);

_subscriptionEndpoint = _bus.GetEndpoint(subscriptionServiceUri);
_producer = new SubscriptionServiceMessageProducer(router, _subscriptionEndpoint);
_subscriptionEndpoint = _bus.GetEndpoint(subscriptionServiceUri);
_producer = new SubscriptionServiceMessageProducer(router, _subscriptionEndpoint);

WaitForSubscriptionServiceResponse();
}
WaitForSubscriptionServiceResponse();
}

/// <summary>
/// Called when a subscription was added.
/// </summary>
/// <param name="message"></param>
public void OnSubscriptionAdded(SubscriptionAdded message)
{
_producer.OnSubscriptionAdded(message);
}
public void OnSubscriptionAdded(SubscriptionAdded message)
{
_producer.OnSubscriptionAdded(message);
}

/// <summary>
/// Called when a subscription was removed.
/// </summary>
/// <param name="message">The message.</param>
public void OnSubscriptionRemoved(SubscriptionRemoved message)
{
_producer.OnSubscriptionRemoved(message);
}
public void OnSubscriptionRemoved(SubscriptionRemoved message)
{
_producer.OnSubscriptionRemoved(message);
}

/// <summary>
/// Called when the observation is complete and we should go away
/// </summary>
public void OnComplete()
{
if (_unsubscribeAction != null)
{
_unsubscribeAction();
_unsubscribeAction = null;
}

_producer.OnComplete();
}

void Consume(IConsumeContext<SubscriptionRefresh> context)
{
if (_subscriptionEndpoint.Address.Uri.Equals(context.SourceAddress))
{
_ready.Set();
}
}

void WaitForSubscriptionServiceResponse()
{
if (_log.IsDebugEnabled)
_log.Debug("Waiting for response from the subscription service");

using (_ready)
{
bool received = _ready.WaitOne(_startTimeout);
if (!received)
{
throw new InvalidOperationException("Timeout waiting for subscription service to respond");
}
}
}

void VerifyClientAndServiceNotOnSameEndpoint(IServiceBus bus)
{
Uri controlUri = bus.ControlBus.Endpoint.Address.Uri;

if (!controlUri.Equals(_subscriptionServiceUri))
return;

string message = "The service bus and subscription service cannot use the same endpoint: " +
controlUri;

throw new EndpointException(controlUri, message);
}
}
public void OnComplete()
{
if (_unsubscribeAction != null)
{
_unsubscribeAction();
_unsubscribeAction = null;
}

_producer.OnComplete();
}

void Consume(IConsumeContext<SubscriptionRefresh> context)
{
var subscriptionUri = _subscriptionEndpoint.Address.Uri.AbsoluteUri;
if(_subscriptionEndpoint.Address.Uri.Query.Length > 0)
subscriptionUri = subscriptionUri.Replace(_subscriptionEndpoint.Address.Uri.Query, "");

var sourceUri = context.SourceAddress.AbsoluteUri;
if (context.SourceAddress.Query.Length > 0)
sourceUri = sourceUri.Replace(context.SourceAddress.Query, "");

if (subscriptionUri.Equals(sourceUri))
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SubscriptionRefresh received, setting ready event");

_ready.Set();
}
else
{
if (_log.IsErrorEnabled)
_log.ErrorFormat("SubscriptionRefresh source address invalid: {0} (expected {1})",
sourceUri,
subscriptionUri);

}
}

void WaitForSubscriptionServiceResponse()
{
if (_log.IsDebugEnabled)
_log.Debug("Waiting for response from the subscription service");

using (_ready)
{
bool received = _ready.WaitOne(_startTimeout);
if (!received)
{
throw new InvalidOperationException("Timeout waiting for subscription service to respond");
}
}
}

void VerifyClientAndServiceNotOnSameEndpoint(IServiceBus bus)
{
Uri controlUri = bus.ControlBus.Endpoint.Address.Uri;

if (!controlUri.Equals(_subscriptionServiceUri))
return;

string message = "The service bus and subscription service cannot use the same endpoint: " +
controlUri;

throw new EndpointException(controlUri, message);
}
}
}
2 changes: 1 addition & 1 deletion src/Samples/Starbucks/Starbucks.Barista/Program.cs
Expand Up @@ -48,7 +48,7 @@ static void Main(string[] args)
{
sbc.ReceiveFrom("msmq://localhost/starbucks_barista");
sbc.UseMsmq();
sbc.UseMulticastSubscriptionClient();
sbc.UseSubscriptionService("msmq://egqdspc-2008r2/mt_subscriptions");
sbc.UseControlBus();
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/Starbucks/Starbucks.Cashier/CashierRegistry.cs
Expand Up @@ -34,7 +34,7 @@ public override void Load()
return ServiceBusFactory.New(sbc =>
{
sbc.UseMsmq();
sbc.UseMulticastSubscriptionClient();
sbc.UseSubscriptionService("msmq://egqdspc-2008r2/mt_subscriptions");
sbc.ReceiveFrom("msmq://localhost/starbucks_cashier");
sbc.SetConcurrentConsumerLimit(1); //a cashier cannot multi-task
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/Starbucks/Starbucks.Customer/Program.cs
Expand Up @@ -46,7 +46,7 @@ static IContainer BootstrapContainer()
{
sbc.ReceiveFrom("msmq://localhost/starbucks_customer");
sbc.UseMsmq();
sbc.UseMulticastSubscriptionClient();
sbc.UseSubscriptionService("msmq://egqdspc-2008r2/mt_subscriptions");
sbc.UseControlBus();
Expand Down
8 changes: 4 additions & 4 deletions src/SolutionVersion.cs
Expand Up @@ -4,11 +4,11 @@
using System.Security;
[assembly: AssemblyDescription("MassTransit is a distributed application framework for .NET http://masstransit-project.com")]
[assembly: AssemblyProduct("MassTransit")]
[assembly: AssemblyCopyright("Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al. - All rights reserved.")]
[assembly: AssemblyVersion("2.1.1")]
[assembly: AssemblyFileVersion("2.1.1")]
[assembly: AssemblyCopyright("Copyright 2007-2012 Chris Patterson, Dru Sellers, Travis Smith, et. al. - All rights reserved.")]
[assembly: AssemblyVersion("2.6.3")]
[assembly: AssemblyFileVersion("2.6.3")]

[assembly: AssemblyInformationalVersion("2.1.1.5dd8fe")]
[assembly: AssemblyInformationalVersion("2.6.3.f5db0d")]
[assembly: ComVisibleAttribute(false)]
[assembly: CLSCompliantAttribute(false)]

0 comments on commit f69b613

Please sign in to comment.