Skip to content

Commit

Permalink
Added IQueue<T>.Requeue, improvement to InMemQueue (renamed to InMemo…
Browse files Browse the repository at this point in the history
…ryQueue), bus and supporting classes, and ThreadPoolPipe
  • Loading branch information
danielcrenna committed Aug 30, 2011
1 parent b4c67c8 commit 5d362fb
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 44 deletions.
20 changes: 10 additions & 10 deletions src/pvc.Core/ActionWrappingConsumer.cs
@@ -1,16 +1,16 @@
using System;
namespace pvc.Core
{
public class ActionWrappingConsumer<T> : Consumes<T> where T:Message
{
private readonly Action<T> _action;
public void Handle(T message) {
_action(message);
}
using System;
namespace pvc.Core
{
public class ActionWrappingConsumer<T> : Consumes<T> where T:Message
{
private readonly Action<T> _action;
public void Handle(T message) {
_action(message);
}
public ActionWrappingConsumer (Action<T> action)
{
if (action == null) throw new ArgumentNullException("action");
_action = action;
}
}
}
}
40 changes: 40 additions & 0 deletions src/pvc.Core/Bus/ActionWrappingBusPublisher.cs
@@ -0,0 +1,40 @@
using System;

namespace pvc.Core.Bus
{
public class ActionWrappingBusPublisher : IBus, Produces<Message>
{
private readonly Action<Event> _eventAction;
private readonly Action<Command> _commandAction;
private Consumes<Message> _consumer;

public ActionWrappingBusPublisher(Action<Event> eventAction, Action<Command> commandAction)
{
_eventAction = eventAction;
_commandAction = commandAction;
}

public void Publish(Event @event)
{
_eventAction(@event);
if (_consumer != null)
{
_consumer.Handle(@event);
}
}

public void Send(Command command)
{
_commandAction(command);
if (_consumer != null)
{
_consumer.Handle(command);
}
}

public void AttachConsumer(Consumes<Message> consumer)
{
_consumer = consumer;
}
}
}
7 changes: 7 additions & 0 deletions src/pvc.Core/Bus/Command.cs
@@ -0,0 +1,7 @@
namespace pvc.Core.Bus
{
public interface Command : Message
{

}
}
7 changes: 7 additions & 0 deletions src/pvc.Core/Bus/Event.cs
@@ -0,0 +1,7 @@
namespace pvc.Core.Bus
{
public interface Event : Message
{

}
}
8 changes: 8 additions & 0 deletions src/pvc.Core/Bus/IBus.cs
@@ -0,0 +1,8 @@
namespace pvc.Core.Bus
{
public interface IBus
{
void Publish(Event @event);
void Send(Command command);
}
}
1 change: 1 addition & 0 deletions src/pvc.Core/IQueue.cs
Expand Up @@ -5,5 +5,6 @@ public interface IQueue<T>
bool TryDequeue(out T item);
void Enqueue(T item);
void MarkComplete(T item);
void Requeue(T item);
}
}
32 changes: 0 additions & 32 deletions src/pvc.Core/InMemQueue.cs

This file was deleted.

44 changes: 44 additions & 0 deletions src/pvc.Core/InMemoryQueue.cs
@@ -0,0 +1,44 @@
//using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace pvc.Core
{
public class InMemoryQueue<T> : IQueue<T>
{
private readonly Queue<T> _queue = new Queue<T>(); //make this a concurrent queue, need to build on 3.5 for mono now

public bool TryDequeue(out T item)
{
lock (_queue)
{
if (_queue.Count > 0)
{
item = _queue.Dequeue();
return true;
}
Thread.Sleep(1);
item = default(T);
return false;
}
}

public void Enqueue(T item)
{
lock (_queue)
{
_queue.Enqueue(item);
}
}

public void MarkComplete(T item)
{

}

public void Requeue(T item)
{
Enqueue(item);
}
}
}
1 change: 1 addition & 0 deletions src/pvc.Core/Pipe.cs
@@ -1,4 +1,5 @@
using System;

namespace pvc.Core
{
public interface Pipe<TConsumes, TProduces> : Consumes<TConsumes>, Produces<TProduces> where TConsumes:Message where TProduces:Message
Expand Down
19 changes: 19 additions & 0 deletions src/pvc.Core/ThreadPoolPipe.cs
@@ -0,0 +1,19 @@
using System.Threading;

namespace pvc.Core
{
public class ThreadPoolPipe<T> : Pipe<T, T> where T : Message
{
private Consumes<T> _consumer;

public void Handle(T message)
{
ThreadPool.QueueUserWorkItem(x => _consumer.Handle(message));
}

public void AttachConsumer(Consumes<T> consumer)
{
_consumer = consumer;
}
}
}
10 changes: 8 additions & 2 deletions src/pvc.Core/pvc.Core.csproj
Expand Up @@ -12,7 +12,7 @@
<AssemblyName>pvc.Core</AssemblyName>
<TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand Down Expand Up @@ -41,18 +41,23 @@
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="Bus\ActionWrappingBusPublisher.cs" />
<Compile Include="BufferMessages.cs" />
<Compile Include="Combiner.cs" />
<Compile Include="Bus\Command.cs" />
<Compile Include="CriticalSection.cs" />
<Compile Include="Bus\Event.cs" />
<Compile Include="EventAggregator.cs" />
<Compile Include="Bus\IBus.cs" />
<Compile Include="IMessagePublisher.cs" />
<Compile Include="InMemQueue.cs" />
<Compile Include="InMemoryQueue.cs" />
<Compile Include="IQueue.cs" />
<Compile Include="MessagePublisher.cs" />
<Compile Include="NullConsumer.cs" />
<Compile Include="Produces.cs" />
<Compile Include="QueueReader.cs" />
<Compile Include="QueueWriter.cs" />
<Compile Include="ThreadPoolPipe.cs" />
<Compile Include="UnableToAcquireLockException.cs" />
<Compile Include="Multiplexor.cs" />
<Compile Include="ThreadBoundary.cs" />
Expand All @@ -70,6 +75,7 @@
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down

0 comments on commit 5d362fb

Please sign in to comment.