Permalink
Browse files

More hacking. Msmq Transport.

  • Loading branch information...
1 parent ccf9863 commit b46650f9cc073ad0b6110b865e4887fd64f038ad @larsw committed Nov 24, 2011
@@ -25,11 +25,11 @@ public void CanReadConfigSection()
[Test]
public void CanAddAnActor()
{
- var _actorA = new ActorConfigurationElement("actorA");
+ var actor = new ActorConfigurationElement("actorA");
var section = new LoomEsbConfigurationSection();
- section.Actors.Add(_actorA);
+ section.Actors.Add(actor);
Assert.AreEqual(1, section.Actors.Count);
- Assert.AreEqual(_actorA, section.Actors.Cast<ActorConfigurationElement>().First());
+ Assert.AreEqual(actor, section.Actors.Cast<ActorConfigurationElement>().First());
}
[Test]
@@ -59,6 +59,7 @@
<DesignTime>True</DesignTime>
<DependentUpon>DistributeMessageToSubscribers.feature</DependentUpon>
</Compile>
+ <Compile Include="MsmqTransportTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Steps.cs" />
</ItemGroup>
@@ -0,0 +1,23 @@
+namespace Loom.Esb.Specs
+{
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class MsmqTransportTests
+ {
+ [Test]
+ public void Foo()
+ {
+ using (var transport = new MsmqTransport())
+ {
+ ManualResetEvent mre = new ManualResetEvent(false);
+ transport.MessageReceived += (sender, e) => { mre.Set(); };
+
+ transport.Send("hello");
+
+ mre.WaitOne();
+ }
+ }
+ }
+}
View
@@ -1,16 +1,27 @@
namespace Loom.Esb
{
+ using System.Collections.ObjectModel;
+
public class Actor
{
private readonly string _name;
+ private readonly ITransport _transport;
+ private readonly MessageHandlerCollection _messageHandlers = new MessageHandlerCollection();
+
+ public MessageHandlerCollection MessageHandlers
+ {
+ get { return _messageHandlers; }
+ }
- public Actor()
+ public Actor(ITransport transport)
{
+ _transport = transport;
Publications = new PublicationCollection();
Subscriptions = new SubscriptionCollection();
}
- public Actor(string name)
+ public Actor(string name, ITransport transport)
+ :this(transport)
{
_name = name;
}
@@ -26,5 +37,23 @@ public PublicationCollection Publications
public void Connect()
{
}
+
+ public void Publish(object message)
+ {
+ _transport.Send(message);
+ }
+ }
+
+ public interface IMessageHandler
+ {
+ }
+
+ public interface IMessageHandler<T> : IMessageHandler
+ {
+ void Handle(T message);
+ }
+
+ public class MessageHandlerCollection : Collection<IMessageHandler>
+ {
}
}
@@ -1,15 +1,16 @@
namespace Loom.Esb
{
using System.Linq;
- using Configuration;
public class ActorFactory
{
- private readonly LoomEsbConfigurationSection _configurationSection;
+ private readonly Configuration.LoomEsbConfigurationSection _configurationSection;
+ private readonly ITransport _transport;
public ActorFactory(Configuration.LoomEsbConfigurationSection configurationSection)
{
_configurationSection = configurationSection;
+ _transport = new MsmqTransport();
}
public Actor CreateActor(string actorName)
@@ -18,7 +19,7 @@ public Actor CreateActor(string actorName)
if (configuration == null)
throw new NoActorConfigurationException();
- var actor = new Actor(actorName);
+ var actor = new Actor(actorName, _transport);
actor.Publications.AddRange(
configuration.Publications.Select(conf => new Publication(conf.Topic)));
actor.Subscriptions.AddRange(
@@ -0,0 +1,10 @@
+namespace Loom.Esb
+{
+ using System;
+
+ public interface ITransport
+ {
+ void Send(object message);
+ event EventHandler<MessageReceivedEventArgs> MessageReceived;
+ }
+}
@@ -61,6 +61,9 @@
<Compile Include="Configuration\TopicConfigurationElement.cs" />
<Compile Include="Configuration\TopicsConfigurationElementCollection.cs" />
<Compile Include="Configuration\TransportsConfigurationElement.cs" />
+ <Compile Include="ITransport.cs" />
+ <Compile Include="MessageReceivedEventArgs.cs" />
+ <Compile Include="MsmqTransport.cs" />
<Compile Include="NoActorConfigurationException.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Publication.cs" />
@@ -0,0 +1,15 @@
+namespace Loom.Esb
+{
+ using System;
+ using System.Messaging;
+
+ public class MessageReceivedEventArgs : EventArgs
+ {
+ public Message Message { get; private set; }
+
+ public MessageReceivedEventArgs(Message message)
+ {
+ Message = message;
+ }
+ }
+}
@@ -0,0 +1,39 @@
+namespace Loom.Esb
+{
+ using System;
+ using System.Messaging;
+ using System.Transactions;
+
+ public class MsmqTransport : ITransport, IDisposable
+ {
+ private readonly MessageQueue _messageQueue;
+
+ public event EventHandler<MessageReceivedEventArgs> MessageReceived = delegate { };
+
+ public MsmqTransport()
+ {
+ _messageQueue = new MessageQueue(".\\private$\\TestQueue");
+ _messageQueue.ReceiveCompleted += OnMessageReceived;
+ _messageQueue.BeginReceive();
+ }
+
+ private void OnMessageReceived(object sender, ReceiveCompletedEventArgs e)
+ {
+ MessageReceived(this, new MessageReceivedEventArgs(e.Message));
+ }
+
+ public void Send(object message)
+ {
+ using (var tx = new TransactionScope(TransactionScopeOption.Required))
+ {
+ _messageQueue.Send(message);
+ tx.Complete();
+ }
+ }
+
+ public void Dispose()
+ {
+ _messageQueue.Dispose();
+ }
+ }
+}

0 comments on commit b46650f

Please sign in to comment.