Skip to content

Commit

Permalink
Pipeline processes events in wrong order
Browse files Browse the repository at this point in the history
  • Loading branch information
jackliusr committed Jul 19, 2013
1 parent 4b6ad56 commit cf3fa1e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
7 changes: 4 additions & 3 deletions Extensions/src/Ncqrs.EventBus/Demultiplexer.cs
Expand Up @@ -33,6 +33,7 @@ public void Demultiplex(IProcessingElement sequencedEvent)
{
AssociateElementAndQueue(sequencedEvent, queue);
queue.Enqueue(sequencedEvent);
EnqueueToProcessing(sequencedEvent);
}
else
{
Expand All @@ -56,10 +57,10 @@ public void MarkAsProcessed(IProcessingElement processingElement)
{
var queue = _queueMap[processingElement.UniqueId];
_queueMap.Remove(processingElement.UniqueId);
if (!queue.Unblock())
if (queue.IsEmpty())
{
_queues.Remove(queue);
}
}
}

private void EnqueueToProcessing(IProcessingElement processingElement)
Expand All @@ -69,7 +70,7 @@ private void EnqueueToProcessing(IProcessingElement processingElement)

private DemultiplexerQueue CreateAndBlockQueueFor(IProcessingElement processingElement)
{
var queue = new DemultiplexerQueue(processingElement.GroupingKey, EnqueueToProcessing);
var queue = new DemultiplexerQueue(processingElement.GroupingKey);
_queues.Add(queue);
return queue;
}
Expand Down
23 changes: 8 additions & 15 deletions Extensions/src/Ncqrs.EventBus/DemultiplexerQueue.cs
Expand Up @@ -5,24 +5,12 @@ namespace Ncqrs.EventBus
{
public class DemultiplexerQueue
{
private readonly object _groupingKey;
private readonly Action<IProcessingElement> _enqueueToProcessingCallback;
private readonly object _groupingKey;
private readonly Queue<IProcessingElement> _queue = new Queue<IProcessingElement>();

public DemultiplexerQueue(object groupingKey, Action<IProcessingElement> enqueueToProcessingCallback)
public DemultiplexerQueue(object groupingKey)
{
_groupingKey = groupingKey;
_enqueueToProcessingCallback = enqueueToProcessingCallback;
}

public bool Unblock()
{
if (_queue.Count > 0)
{
_enqueueToProcessingCallback(_queue.Dequeue());
return true;
}
return false;
_groupingKey = groupingKey;
}

public bool Accepts(IProcessingElement fetchedElement)
Expand All @@ -34,5 +22,10 @@ public void Enqueue(IProcessingElement processingElement)
{
_queue.Enqueue(processingElement);
}

public bool IsEmpty()
{
return (_queue.Count == 0);
}
}
}

0 comments on commit cf3fa1e

Please sign in to comment.