Permalink
Browse files

Topic based routing implementation

  • Loading branch information...
mikehadlow committed May 2, 2012
1 parent 3c68538 commit 91e1e7b446fb367a0899e0e6f3156f3af5fa2dbd
@@ -11,5 +11,10 @@ public void RawPublish(string exchangeName, byte[] messageBody)
if (RawPublishDelegate != null)
RawPublishDelegate(exchangeName, messageBody);
}
+
+ public void RawPublish(string exchangeName, string topic, byte[] messageBody)
+ {
+ throw new NotImplementedException();
+ }
}
}
@@ -24,10 +24,10 @@ public void The_default_exchange_naming_convention_should_use_the_TypeNameSerial
}
[Test]
- public void The_default_topic_naming_convention_should_use_the_TypeNameSerializers_Serialize_method()
+ public void The_default_topic_naming_convention_should_return_an_empty_string()
{
var result = conventions.TopicNamingConvention(typeof (TestMessage));
- result.ShouldEqual(TypeNameSerializer.Serialize(typeof (TestMessage)));
+ result.ShouldEqual("");
}
[Test]
@@ -86,6 +86,7 @@
<Compile Include="ModelCleanupTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PublishSubscribeTests.cs" />
+ <Compile Include="PublishSubscribeWithTopicsTests.cs" />
<Compile Include="RequestResponseTests.cs" />
<Compile Include="Sagas\AsyncSubscribePublishSaga.cs" />
<Compile Include="Sagas\RequestResponseSaga.cs" />
@@ -54,7 +54,7 @@ public void Should_also_send_messages_to_second_subscriber()
messageQueue2.Subscribe<MyMessage>("test2", msg => Console.WriteLine(msg.Text));
// allow time for messages to be consumed
- Thread.Sleep(100);
+ Thread.Sleep(500);
Console.WriteLine("Stopped consuming");
}
@@ -0,0 +1,60 @@
+// ReSharper disable InconsistentNaming
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+
+namespace EasyNetQ.Tests
+{
+ [TestFixture]
+ public class PublishSubscribeWithTopicsTests
+ {
+ private IBus bus;
+
+ [SetUp]
+ public void SetUp()
+ {
+ bus = RabbitHutch.CreateBus("host=localhost");
+ while (!bus.IsConnected) Thread.Sleep(10);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ if(bus != null) bus.Dispose();
+ }
+
+ private MyMessage CreateMessage()
+ {
+ return new MyMessage { Text = "Hello! " + Guid.NewGuid().ToString().Substring(0, 5) };
+ }
+
+ [Test]
+ public void Publish_some_messages_with_topics()
+ {
+ bus.Publish("X.A", CreateMessage());
+ bus.Publish("X.B", CreateMessage());
+ bus.Publish("Y.A", CreateMessage());
+ }
+
+ [Test]
+ public void Subscribe_to_messages_with_topics()
+ {
+ bus.Subscribe<MyMessage>("id1", "X.*", msg => Console.WriteLine("I Get X: {0}", msg.Text));
+ bus.Subscribe<MyMessage>("id2", "*.A", msg => Console.WriteLine("I Get A: {0}", msg.Text));
+ bus.Subscribe<MyMessage>("id3", msg => Console.WriteLine("I Get All: {0}", msg.Text));
+
+ Thread.Sleep(500);
+ }
+
+ [Test]
+ public void Should_subscribe_to_multiple_topic_strings()
+ {
+ bus.Subscribe<MyMessage>("id4", new[]{"Y.*", "*.B"}, msg => Console.WriteLine("I Get Y or B: {0}", msg.Text));
+
+ Thread.Sleep(500);
+ }
+ }
+}
+
+// ReSharper restore InconsistentNaming
@@ -22,7 +22,7 @@ public Conventions()
{
// Establish default conventions.
ExchangeNamingConvention = TypeNameSerializer.Serialize;
- TopicNamingConvention = TypeNameSerializer.Serialize;
+ TopicNamingConvention = messageType => "";
QueueNamingConvention =
(messageType, subscriptionId) =>
{
@@ -54,6 +54,7 @@
<Compile Include="DefaultConsumerErrorStrategy.cs" />
<Compile Include="Delegates.cs" />
<Compile Include="EasyNetQException.cs" />
+ <Compile Include="Extensions.cs" />
<Compile Include="IBus.cs" />
<Compile Include="IConnectionFactory.cs" />
<Compile Include="IConsumerErrorStrategy.cs" />
@@ -0,0 +1,12 @@
+using System.Collections.Generic;
+
+namespace EasyNetQ
+{
+ public static class Extensions
+ {
+ public static IEnumerable<T> ToEnumerable<T>(this T item)
+ {
+ yield return item;
+ }
+ }
+}
@@ -3,5 +3,6 @@ namespace EasyNetQ
public interface IRawByteBus
{
void RawPublish(string exchangeName, byte[] messageBody);
+ void RawPublish(string exchangeName, string topic, byte[] messageBody);
}
}
@@ -85,32 +85,35 @@ public class RabbitBus : IBus, IRawByteBus
public void Publish<T>(T message)
{
- if(message == null)
+ Publish(GetTopic<T>(), message);
+ }
+
+ public void Publish<T>(string topic, T message)
+ {
+ if (message == null)
{
throw new ArgumentNullException("message");
}
- var typeName = serializeType(typeof (T));
- var exchangeName = GetExchangeName<T>();
- var topic = GetTopic<T>();
+ var typeName = serializeType(typeof(T));
+ var exchangeName = GetExchangeName<T>();
var messageBody = serializer.MessageToBytes(message);
RawPublish(exchangeName, topic, typeName, messageBody);
}
- public void Publish<T>(string topic, T message)
+ // channels should not be shared between threads.
+ private ThreadLocal<IModel> threadLocalPublishChannel = new ThreadLocal<IModel>();
+
+
+ public void RawPublish(string exchangeName, byte[] messageBody)
{
- throw new NotImplementedException();
+ RawPublish(exchangeName, "", messageBody);
}
- // channels should not be shared between threads.
- private ThreadLocal<IModel> threadLocalPublishChannel = new ThreadLocal<IModel>();
-
-
- public void RawPublish(string typeName, byte[] messageBody)
+ public void RawPublish(string typeName, string topic, byte[] messageBody)
{
var exchangeName = typeName;
- var topic = typeName;
RawPublish(exchangeName, topic, typeName, messageBody);
}
@@ -161,7 +164,7 @@ private void DeclarePublishExchange(IModel channel, string exchangeName)
{
channel.ExchangeDeclare(
exchangeName, // exchange
- ExchangeType.Direct, // type
+ ExchangeType.Topic, // type
true); // durable
publishExchanges.Add(exchangeName);
@@ -183,7 +186,17 @@ private void CheckMessageType<TMessage>(IBasicProperties properties)
public void Subscribe<T>(string subscriptionId, Action<T> onMessage)
{
- SubscribeAsync<T>(subscriptionId, msg =>
+ Subscribe(subscriptionId, "#", onMessage);
+ }
+
+ public void Subscribe<T>(string subscriptionId, string topic, Action<T> onMessage)
+ {
+ Subscribe(subscriptionId, topic.ToEnumerable(), onMessage);
+ }
+
+ public void Subscribe<T>(string subscriptionId, IEnumerable<string> topics, Action<T> onMessage)
+ {
+ SubscribeAsync<T>(subscriptionId, topics, msg =>
{
var tcs = new TaskCompletionSource<object>();
try
@@ -199,31 +212,30 @@ public void Subscribe<T>(string subscriptionId, Action<T> onMessage)
});
}
- public void Subscribe<T>(string subscriptionId, string topic, Action<T> onMessage)
+ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
{
- throw new NotImplementedException();
+ SubscribeAsync(subscriptionId, "#", onMessage);
}
- public void Subscribe<T>(string subscriptionId, IEnumerable<string> topics, Action<T> onMessage)
+ public void SubscribeAsync<T>(string subscriptionId, string topic, Func<T, Task> onMessage)
{
- throw new NotImplementedException();
+ SubscribeAsync(subscriptionId, topic.ToEnumerable(), onMessage);
}
- public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
+ public void SubscribeAsync<T>(string subscriptionId, IEnumerable<string> topics, Func<T, Task> onMessage)
{
if (onMessage == null)
{
throw new ArgumentNullException("onMessage");
}
- string queueName = GetQueueName<T>(subscriptionId);
- string exchangeName = GetExchangeName<T>();
- string topic = GetTopic<T>();
+ var queueName = GetQueueName<T>(subscriptionId);
+ var exchangeName = GetExchangeName<T>();
Action subscribeAction = () =>
{
var channel = connection.CreateModel();
- modelList.Add( channel );
+ modelList.Add(channel);
DeclarePublishExchange(channel, exchangeName);
channel.BasicQos(0, prefetchCount, false);
@@ -235,10 +247,14 @@ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
false, // autoDelete
null); // arguments
- channel.QueueBind(
- queue, // queue
- exchangeName, // exchange
- topic); // routingKey
+ foreach(var topic in topics)
+ {
+ channel.QueueBind(
+ queue, // queue
+ exchangeName, // exchange
+ topic); // routingKey
+
+ }
var consumer = consumerFactory.CreateConsumer(channel,
(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) =>
@@ -259,16 +275,6 @@ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
AddSubscriptionAction(subscribeAction);
}
- public void SubscribeAsync<T>(string subscriptionId, string topic, Func<T, Task> onMessage)
- {
- throw new NotImplementedException();
- }
-
- public void SubscribeAsync<T>(string subscriptionId, IEnumerable<string> topics, Func<T, Task> onMessage)
- {
- throw new NotImplementedException();
- }
-
private string GetTopic<T>()
{

0 comments on commit 91e1e7b

Please sign in to comment.