Skip to content

Commit

Permalink
Even more functionality generalized to SlimIOCP.Shared
Browse files Browse the repository at this point in the history
  • Loading branch information
fholm committed Nov 26, 2011
1 parent b1fd9ff commit c41aaf9
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 58 deletions.
10 changes: 1 addition & 9 deletions SlimIOCP/SlimIOCP.Mono/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@

namespace SlimIOCP.Mono
{
public class Peer : BasePeer
public class Peer : BasePeer<IncomingMessage, IncomingMessage, IncomingMessage>
{
public override bool TryGetMessage(out IncomingMessage message)
{
throw new NotImplementedException();
}

public override bool TryRecycleMessage(IncomingMessage message)
{
throw new NotImplementedException();
}
}
}
61 changes: 58 additions & 3 deletions SlimIOCP/SlimIOCP.Shared/BasePeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,39 @@

namespace SlimIOCP
{
public abstract class BasePeer
public abstract class BasePeer<
TIncomingBuffer,
TIncomingMessage,
TOutgoingMessage

/*,
TIncomingBufferProducer,
TIncomingMessageProducer,
TOutgoingMessageProducer
*/
>

where TIncomingBuffer : MessageBuffer
where TIncomingMessage : MessageBuffer
where TOutgoingMessage : MessageBuffer

/*
where TIncomingBufferProducer : MessageBufferProducer<TIncomingBuffer>, new()
where TIncomingMessageProducer : MessageBufferProducer<TIncomingMessage>, new()
where TOutgoingMessageProducer : MessageBufferProducer<TOutgoingMessage>, new()
*/
{
static internal int ReceiverThreadIdCounter = -1;

internal MessageBufferPool<TIncomingBuffer> IncomingBufferPool;
internal MessageBufferPool<TIncomingMessage> IncomingMessagePool;
internal MessageBufferPool<TOutgoingMessage> OutgoingMessagePool;

internal Queue<TIncomingBuffer> IncomingBufferQueue;
internal readonly object IncomingBufferQueueSync = new object();
internal readonly QueuePool<TIncomingBuffer> IncomingBufferQueuePool;
internal readonly Queue<TIncomingMessage> ReceivedMessages;

internal Socket Socket;
internal Thread ReceiverThread;
internal BaseReceiver Receiver;
Expand All @@ -25,10 +54,36 @@ public BasePeer()
{
ReceiverEvent = new ManualResetEvent(true);
ReceivedMessageEvent = new ManualResetEvent(false);
ReceivedMessages = new Queue<TIncomingMessage>();

IncomingBufferQueue = new Queue<TIncomingBuffer>();
IncomingBufferQueuePool = new QueuePool<TIncomingBuffer>(32);
}

public bool TryRecycleMessage(TIncomingMessage message)
{
lock (IncomingMessagePool)
{
return IncomingMessagePool.TryPush(message);
}
}

public abstract bool TryGetMessage(out IncomingMessage message);
public abstract bool TryRecycleMessage(IncomingMessage message);
public bool TryGetMessage(out TIncomingMessage message)
{
lock (ReceivedMessages)
{
if (ReceivedMessages.Count > 0)
{
message = ReceivedMessages.Dequeue();
return true;
}
}

ReceivedMessageEvent.Reset();

message = null;
return false;
}

internal void StartReceiver()
{
Expand Down
95 changes: 95 additions & 0 deletions SlimIOCP/SlimIOCP.Shared/BaseReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,101 @@

namespace SlimIOCP
{
/*
internal class Receiver : BaseReceiver
{
readonly Peer peer;
internal Receiver(Peer peer)
{
this.peer = peer;
}
internal override void ReceiveLoop()
{
Queue<IncomingBuffer> queue = null;
while (true)
{
lock (peer.IncomingBufferQueueSync)
{
if (peer.IncomingBufferQueue.Count > 0)
{
queue = peer.IncomingBufferQueue;
if (!peer.IncomingBufferQueuePool.TryPop(out peer.IncomingBufferQueue))
{
//TODO: Error
}
}
}
if (queue != null)
{
while (queue.Count > 0)
{
var buffer = queue.Dequeue();
var bufferHandle = buffer.BufferHandle;
var bufferOffset = buffer.BufferOffset;
var bufferLength = buffer.AsyncArgs.BytesTransferred;
var connection = buffer.Connection;
while (bufferLength > 0)
{
if (connection.Message == null)
{
if (!peer.IncomingMessagePool.TryPop(out connection.Message))
{
//TODO: Error
}
}
connection.Message = Receive(connection.Message, bufferHandle, ref bufferOffset, ref bufferLength);
if (connection.Message.IsDone)
{
//lock (connection.ReceiveQueue)
//{
//connection.ReceiveQueue.Enqueue(connection.Message);
//}
// Queue into received messages
lock (peer.ReceivedMessages)
{
peer.ReceivedMessages.Enqueue(connection.Message);
}
// Signal wait event
peer.ReceivedMessageEvent.Set();
// Clear message on connection
connection.Message = null;
}
}
if (!peer.IncomingBufferPool.TryPush(buffer))
{
//TODO: Error
}
}
if (!peer.IncomingBufferQueuePool.TryPush(queue))
{
//TODO: Error
}
}
peer.ReceiverEvent.Reset();
if (peer.IncomingBufferQueue.Count == 0)
{
peer.ReceiverEvent.WaitOne();
}
}
}
}*/

internal abstract class BaseReceiver
{
#if DEBUG
Expand Down
5 changes: 3 additions & 2 deletions SlimIOCP/SlimIOCP.Shared/MessageBufferProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public MessageBufferProducer()
}

protected abstract T Create();
//protected abstract void SetPeer(object peer);

public T Get()
{
Expand All @@ -31,11 +32,11 @@ public T Get()

public void Return(T message)
{
if (pool.Count < 32)
if (pool.Count < 64)
{
lock (pool)
{
if (pool.Count < 32)
if (pool.Count < 64)
{
pool.Enqueue(message);
}
Expand Down
2 changes: 1 addition & 1 deletion SlimIOCP/SlimIOCP/IncomingBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace SlimIOCP
{
class IncomingBuffer : MessageBuffer
public class IncomingBuffer : MessageBuffer
{
internal Connection Connection;
internal readonly SocketAsyncEventArgs AsyncArgs;
Expand Down
46 changes: 3 additions & 43 deletions SlimIOCP/SlimIOCP/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,15 @@

namespace SlimIOCP
{
public class Peer : BasePeer
public class Peer : BasePeer<IncomingBuffer, IncomingMessage, OutgoingMessage>
{
internal Queue<IncomingBuffer> IncomingBufferQueue;
internal readonly QueuePool<IncomingBuffer> IncomingBufferQueuePool;

internal readonly object IncomingBufferQueueSync = new object();

internal readonly Queue<IncomingMessage> ReceivedMessages;

internal readonly MessageBufferPool<IncomingBuffer> IncomingBufferPool;
internal readonly MessageBufferPool<IncomingMessage> IncomingMessagePool;
internal readonly MessageBufferPool<OutgoingMessage> OutgoingMessagePool;

internal Peer() : base()
internal Peer()
: base()
{
Receiver = new Receiver(this);
ReceivedMessages = new Queue<IncomingMessage>();

IncomingBufferPool = new MessageBufferPool<IncomingBuffer>(new IncomingBufferProducer(this));
IncomingMessagePool = new MessageBufferPool<IncomingMessage>(new IncomingMessageProducer());
OutgoingMessagePool = new MessageBufferPool<OutgoingMessage>(new OutgoingMessageProducer(this));

IncomingBufferQueue = new Queue<IncomingBuffer>();
IncomingBufferQueuePool = new QueuePool<IncomingBuffer>(32);
}

public override bool TryRecycleMessage(IncomingMessage message)
{
lock (IncomingMessagePool)
{
return IncomingMessagePool.TryPush(message);
}
}

public override bool TryGetMessage(out IncomingMessage message)
{
lock (ReceivedMessages)
{
if (ReceivedMessages.Count > 0)
{
message = ReceivedMessages.Dequeue();
return true;
}
}

ReceivedMessageEvent.Reset();

message = null;
return false;
}

internal void OnComplete(object sender, SocketAsyncEventArgs asyncArgs)
Expand Down

0 comments on commit c41aaf9

Please sign in to comment.