Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Removed support for non-transactional queues, also made internal dict…

…ionaries non-static
  • Loading branch information...
commit d74c726551b3b8640ada190458344b0a8459a08c 1 parent 996753b
Johan Danforth authored

Showing 2 changed files with 66 additions and 97 deletions. Show diff stats Hide diff stats

  1. +29 0 .gitignore
  2. +37 97 MiniBuss/ServiceBus.cs
29 .gitignore
... ... @@ -0,0 +1,29 @@
  1 +# Output directories #
  2 +######################
  3 +[bB]in
  4 +[dD]ebug
  5 +[rR]elease
  6 +[oO]bj
  7 +[bB]uild[aA]rtifacts
  8 +
  9 +# Data #
  10 +########
  11 +[aA]pp_[dD]ata
  12 +
  13 +# Resharper #
  14 +#############
  15 +*.ReSharper
  16 +_ReSharper*
  17 +
  18 +# Local info #
  19 +##############
  20 +*.vssscc
  21 +*.user
  22 +*.suo
  23 +*.vspscc
  24 +*.[pP]ublish.xml
  25 +*mm_cache.bin
  26 +*.sln.cache
  27 +
  28 +
  29 +
134 MiniBuss/ServiceBus.cs
@@ -30,11 +30,11 @@ public interface IServiceBus
30 30
31 31 public class ServiceBus : IServiceBus
32 32 {
33   - private static readonly ConcurrentDictionary<RuntimeTypeHandle, string> ReplyQueues = new ConcurrentDictionary<RuntimeTypeHandle, string>();
34   - private static readonly ConcurrentDictionary<RuntimeTypeHandle, string> TargetQueues = new ConcurrentDictionary<RuntimeTypeHandle, string>();
35   - private static readonly ConcurrentDictionary<Type, object> MessageHandlers = new ConcurrentDictionary<Type, object>();
36   - private static readonly List<Subscription> Subscriptions = new List<Subscription>(); //concurrency handled with lock()
37   - private static readonly ConcurrentDictionary<string, Type> HandledSubscriptions = new ConcurrentDictionary<string, Type>();
  33 + private readonly ConcurrentDictionary<RuntimeTypeHandle, string> _replyQueues = new ConcurrentDictionary<RuntimeTypeHandle, string>();
  34 + private readonly ConcurrentDictionary<RuntimeTypeHandle, string> _targetQueues = new ConcurrentDictionary<RuntimeTypeHandle, string>();
  35 + private readonly ConcurrentDictionary<Type, object> _messageHandlers = new ConcurrentDictionary<Type, object>();
  36 + private readonly List<Subscription> _subscriptions = new List<Subscription>(); //concurrency handled with lock()
  37 + private readonly ConcurrentDictionary<string, Type> _handledSubscriptions = new ConcurrentDictionary<string, Type>();
38 38
39 39 private MessageQueue _queue;
40 40
@@ -67,12 +67,12 @@ private static string GetEndpointName(string value)
67 67
68 68 public void RegisterMessageEndpoint<TCommand>(string targetEndpoint) where TCommand : class
69 69 {
70   - TargetQueues[typeof(TCommand).TypeHandle] = GetEndpointName(targetEndpoint);
  70 + _targetQueues[typeof(TCommand).TypeHandle] = GetEndpointName(targetEndpoint);
71 71 }
72 72
73 73 public void Reply(object message, object response)
74 74 {
75   - var rq = ReplyQueues[message.GetType().TypeHandle];
  75 + var rq = _replyQueues[message.GetType().TypeHandle];
76 76 if (rq == null) throw new InvalidOperationException("Endpoint for replying not found for current message");
77 77
78 78 SendMessage(response, rq);
@@ -80,7 +80,7 @@ public void Reply(object message, object response)
80 80
81 81 public void Send(object command)
82 82 {
83   - var targetQueue = TargetQueues[command.GetType().TypeHandle];
  83 + var targetQueue = _targetQueues[command.GetType().TypeHandle];
84 84 SendMessage(command, targetQueue);
85 85 }
86 86
@@ -98,17 +98,10 @@ private void SendMessage(object msg, string targetQueue)
98 98 message.ResponseQueue = responseQ;
99 99 }
100 100
101   - if (msgQ.Transactional)
  101 + using (var tx = new TransactionScope())
102 102 {
103   - using (var tx = new TransactionScope())
104   - {
105   - msgQ.Send(message, MessageQueueTransactionType.Automatic);
106   - tx.Complete();
107   - }
108   - }
109   - else
110   - {
111   - msgQ.Send(message);
  103 + msgQ.Send(message, MessageQueueTransactionType.Automatic);
  104 + tx.Complete();
112 105 }
113 106 }
114 107
@@ -123,7 +116,7 @@ private static void CreateTransactionalQueueIfNotExists(string queueName)
123 116 var name = typeof(TCommand).Name;
124 117 if (name == null) throw new InvalidOperationException("Should not happen");
125 118
126   - MessageHandlers[typeof(TCommand)] = CastArgument<object, TCommand>(x => handler(x));
  119 + _messageHandlers[typeof(TCommand)] = CastArgument<object, TCommand>(x => handler(x));
127 120 }
128 121
129 122 private static Action<TBase> CastArgument<TBase, TDerived>(Expression<Action<TDerived>> source) where TDerived : TBase
@@ -146,21 +139,16 @@ public void Start()
146 139 {
147 140 MessageReadPropertyFilter = { AppSpecific = true }
148 141 };
149   - _queue.ReceiveCompleted += QueueReceiveCompleted; //NOTE: this whole bit could be removed if we decided to not use non-transactional queues
150 142 _queue.PeekCompleted += QueuePeekCompleted;
151   - if (_queue.Transactional)
152   - _queue.BeginPeek();
153   - else
154   - _queue.BeginReceive();
  143 + _queue.BeginPeek();
155 144 }
156 145
157 146 public void Stop()
158 147 {
159   - _queue.ReceiveCompleted -= QueueReceiveCompleted;
160 148 _queue.PeekCompleted -= QueuePeekCompleted;
161 149 }
162 150
163   - private static void QueuePeekCompleted(object sender, PeekCompletedEventArgs e)
  151 + private void QueuePeekCompleted(object sender, PeekCompletedEventArgs e)
164 152 {
165 153 var cmq = (MessageQueue)sender;
166 154 cmq.EndPeek(e.AsyncResult);
@@ -195,51 +183,19 @@ private static void QueuePeekCompleted(object sender, PeekCompletedEventArgs e)
195 183 cmq.BeginPeek();
196 184 }
197 185
198   - private static void QueueReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
  186 + private void HandleMessage(Message msg)
199 187 {
200   - var cmq = (MessageQueue)sender;
201   -
202   - Message msg = null; //keep outside scope to move this to the error log
203   - try
204   - {
205   - msg = cmq.EndReceive(e.AsyncResult);
206   - if (msg.AppSpecific == 0)
207   - HandleMessage(msg);
208   - else
209   - HandleSubscribeAndUnsubscribeMessage(msg);
210   -
211   - }
212   - catch (Exception ex)
213   - {
214   - ConsoleError("Exception while receiving message: " + ex.Message);
215   - if (msg != null)
216   - using (var scope = new TransactionScope())
217   - {
218   - using (var myQueue = new MessageQueue(cmq.MachineName + "\\" + cmq.QueueName + "_errors"))
219   - {
220   - myQueue.Send(msg, MessageQueueTransactionType.Automatic);
221   - }
222   - scope.Complete();
223   - }
224   -
225   - }
226   - cmq.Refresh();
227   - cmq.BeginReceive();
228   - }
229   -
230   - private static void HandleMessage(Message msg)
231   - {
232   - var types = MessageHandlers.Select(h => h.Key).ToArray();
  188 + var types = _messageHandlers.Select(h => h.Key).ToArray();
233 189
234 190 msg.Formatter = new XmlMessageFormatter(types);
235 191 var message = msg.Body;
236 192 if (message == null) throw new Exception("Could not extract message from msg body - unknown message to us?");
237 193
238 194 var messageType = message.GetType();
239   - var handler = MessageHandlers[messageType] as Action<object>; //will throw if no handler is found
  195 + var handler = _messageHandlers[messageType] as Action<object>; //will throw if no handler is found
240 196
241 197 if (msg.ResponseQueue != null)
242   - ReplyQueues.TryAdd(message.GetType().TypeHandle,
  198 + _replyQueues.TryAdd(message.GetType().TypeHandle,
243 199 msg.ResponseQueue.MachineName + "\\" + msg.ResponseQueue.QueueName);
244 200
245 201 //execute the delegate for this message
@@ -248,7 +204,7 @@ private static void HandleMessage(Message msg)
248 204 if (msg.ResponseQueue != null)
249 205 {
250 206 string rq;
251   - var res = ReplyQueues.TryRemove(message.GetType().TypeHandle, out rq);
  207 + var res = _replyQueues.TryRemove(message.GetType().TypeHandle, out rq);
252 208 if (res == false) throw new Exception("Could not remove reply-queue, should not happen");
253 209 }
254 210 }
@@ -256,22 +212,19 @@ private static void HandleMessage(Message msg)
256 212 public void Publish(object @event)
257 213 {
258 214 List<Subscription> subscriptions;
259   - lock (Subscriptions)
  215 + lock (_subscriptions)
260 216 {
261   - subscriptions = Subscriptions.Where(s => s.Type == @event.GetType()).ToList();
  217 + subscriptions = _subscriptions.Where(s => s.Type == @event.GetType()).ToList();
262 218 }
263 219
264 220 Parallel.ForEach(subscriptions, subscription =>
265 221 {
266 222 var type = subscription.Type;
267   - if (type.Name == null) throw new Exception("Should not be possible");
268   -
269 223 var message = new Message { Body = @event, Recoverable = true, Label = type.Name };
270 224
271 225 //NOTE: Should published messages be removed if not handled? Easy to do with a TimeToBeReceived setting
272 226 //message.TimeToBeReceived = new TimeSpan(0,0,0,10); //remove from queue after 10 secs
273   - var msgQ = new MessageQueue(subscription.SubscriberQueue);
274   - if (msgQ.Transactional)
  227 + using (var msgQ = new MessageQueue(subscription.SubscriberQueue))
275 228 {
276 229 using (var mqt = new MessageQueueTransaction())
277 230 {
@@ -280,10 +233,6 @@ public void Publish(object @event)
280 233 mqt.Commit();
281 234 }
282 235 }
283   - else
284   - {
285   - msgQ.Send(message);
286   - }
287 236 });
288 237 }
289 238
@@ -292,15 +241,15 @@ public void Publish(object @event)
292 241 var type = typeof(TEvent);
293 242 if (type.Name == null) throw new Exception("Should not be possible");
294 243
295   - if (!HandledSubscriptions.ContainsKey(type.Name))
296   - HandledSubscriptions[type.Name] = type;
  244 + if (!_handledSubscriptions.ContainsKey(type.Name))
  245 + _handledSubscriptions[type.Name] = type;
297 246 }
298 247
299   - private static void HandleSubscribeAndUnsubscribeMessage(Message subscriptionMsg)
  248 + private void HandleSubscribeAndUnsubscribeMessage(Message subscriptionMsg)
300 249 {
301 250 var typestring = subscriptionMsg.Label; //label contains the message type name (no namespace, just class name)
302 251
303   - var type = HandledSubscriptions[typestring];
  252 + var type = _handledSubscriptions[typestring];
304 253
305 254 var subscriptionCommand = (SubscriptionCommand)subscriptionMsg.AppSpecific;
306 255
@@ -310,18 +259,18 @@ private static void HandleSubscribeAndUnsubscribeMessage(Message subscriptionMsg
310 259 {
311 260 case SubscriptionCommand.Start:
312 261 ConsoleInfo("Start sending events of type " + typestring + " to " + queue);
313   - lock (Subscriptions)
  262 + lock (_subscriptions)
314 263 {
315   - if (!Subscriptions.Any(s => s.Type == type && s.SubscriberQueue == queue))
316   - Subscriptions.Add(new Subscription { Type = type, SubscriberQueue = queue });
  264 + if (!_subscriptions.Any(s => s.Type == type && s.SubscriberQueue == queue))
  265 + _subscriptions.Add(new Subscription { Type = type, SubscriberQueue = queue });
317 266 }
318 267 break;
319 268 case SubscriptionCommand.Stop:
320 269 ConsoleInfo("Stop sending events of type " + typestring + " to " + queue);
321   - lock (Subscriptions)
  270 + lock (_subscriptions)
322 271 {
323   - if (Subscriptions.Any(s => s.Type == type && s.SubscriberQueue == queue))
324   - Subscriptions.Remove(Subscriptions.Where(s => s.Type == type && s.SubscriberQueue == (queue)).FirstOrDefault());
  272 + if (_subscriptions.Any(s => s.Type == type && s.SubscriberQueue == queue))
  273 + _subscriptions.Remove(_subscriptions.Where(s => s.Type == type && s.SubscriberQueue == (queue)).FirstOrDefault());
325 274 }
326 275 break;
327 276 }
@@ -332,7 +281,7 @@ private static void HandleSubscribeAndUnsubscribeMessage(Message subscriptionMsg
332 281 var type = typeof(TEvent);
333 282 if (type.Name == null) throw new Exception("Should not be possible");
334 283
335   - MessageHandlers[typeof(TEvent)] = CastArgument<object, TEvent>(x => handler(x));
  284 + _messageHandlers[typeof(TEvent)] = CastArgument<object, TEvent>(x => handler(x));
336 285
337 286 var message = new Message { AppSpecific = (int)SubscriptionCommand.Start, Recoverable = true, Label = type.Name };
338 287 SendSubscribeMessage(GetEndpointName(publisherEndpoint), message);
@@ -354,18 +303,10 @@ private void SendSubscribeMessage(string publisherQueue, Message message)
354 303
355 304 var responseQ = new MessageQueue(LocalEndpoint);
356 305 message.ResponseQueue = responseQ;
357   -
358   - if (msgQ.Transactional)
359   - {
360   - using (var tx = new TransactionScope())
361   - {
362   - msgQ.Send(message, MessageQueueTransactionType.Automatic);
363   - tx.Complete();
364   - }
365   - }
366   - else
  306 + using (var tx = new TransactionScope())
367 307 {
368   - msgQ.Send(message);
  308 + msgQ.Send(message, MessageQueueTransactionType.Automatic);
  309 + tx.Complete();
369 310 }
370 311 }
371 312
@@ -380,7 +321,7 @@ private static void ConsoleError(string text)
380 321 {
381 322 Console.ForegroundColor = ConsoleColor.Red;
382 323 Console.WriteLine(text);
383   - Console.ResetColor();
  324 + Console.ResetColor();
384 325 }
385 326
386 327 private enum SubscriptionCommand
@@ -395,5 +336,4 @@ private class Subscription
395 336 public string SubscriberQueue { get; set; }
396 337 }
397 338 }
398   -
399 339 }

0 comments on commit d74c726

Please sign in to comment.
Something went wrong with that request. Please try again.