From cf3fa1e6e1a99e157eb93e6a5561bb1f90f55521 Mon Sep 17 00:00:00 2001 From: ShuRui Liu Date: Fri, 19 Jul 2013 10:01:01 +0800 Subject: [PATCH] Pipeline processes events in wrong order theBoringCoder report this bug (https://groups.google.com/forum/?fromgroups#!topic/ncqrs-dev/atPQGjHTATc) --- .../src/Ncqrs.EventBus/Demultiplexer.cs | 7 +++--- .../src/Ncqrs.EventBus/DemultiplexerQueue.cs | 23 +++++++------------ 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/Extensions/src/Ncqrs.EventBus/Demultiplexer.cs b/Extensions/src/Ncqrs.EventBus/Demultiplexer.cs index db0e21f1..683bda9f 100644 --- a/Extensions/src/Ncqrs.EventBus/Demultiplexer.cs +++ b/Extensions/src/Ncqrs.EventBus/Demultiplexer.cs @@ -33,6 +33,7 @@ public void Demultiplex(IProcessingElement sequencedEvent) { AssociateElementAndQueue(sequencedEvent, queue); queue.Enqueue(sequencedEvent); + EnqueueToProcessing(sequencedEvent); } else { @@ -56,10 +57,10 @@ public void MarkAsProcessed(IProcessingElement processingElement) { var queue = _queueMap[processingElement.UniqueId]; _queueMap.Remove(processingElement.UniqueId); - if (!queue.Unblock()) + if (queue.IsEmpty()) { _queues.Remove(queue); - } + } } private void EnqueueToProcessing(IProcessingElement processingElement) @@ -69,7 +70,7 @@ private void EnqueueToProcessing(IProcessingElement processingElement) private DemultiplexerQueue CreateAndBlockQueueFor(IProcessingElement processingElement) { - var queue = new DemultiplexerQueue(processingElement.GroupingKey, EnqueueToProcessing); + var queue = new DemultiplexerQueue(processingElement.GroupingKey); _queues.Add(queue); return queue; } diff --git a/Extensions/src/Ncqrs.EventBus/DemultiplexerQueue.cs b/Extensions/src/Ncqrs.EventBus/DemultiplexerQueue.cs index b81ac898..bcb1e2c9 100644 --- a/Extensions/src/Ncqrs.EventBus/DemultiplexerQueue.cs +++ b/Extensions/src/Ncqrs.EventBus/DemultiplexerQueue.cs @@ -5,24 +5,12 @@ namespace Ncqrs.EventBus { public class DemultiplexerQueue { - private readonly object _groupingKey; - private readonly Action _enqueueToProcessingCallback; + private readonly object _groupingKey; private readonly Queue _queue = new Queue(); - public DemultiplexerQueue(object groupingKey, Action enqueueToProcessingCallback) + public DemultiplexerQueue(object groupingKey) { - _groupingKey = groupingKey; - _enqueueToProcessingCallback = enqueueToProcessingCallback; - } - - public bool Unblock() - { - if (_queue.Count > 0) - { - _enqueueToProcessingCallback(_queue.Dequeue()); - return true; - } - return false; + _groupingKey = groupingKey; } public bool Accepts(IProcessingElement fetchedElement) @@ -34,5 +22,10 @@ public void Enqueue(IProcessingElement processingElement) { _queue.Enqueue(processingElement); } + + public bool IsEmpty() + { + return (_queue.Count == 0); + } } } \ No newline at end of file