Permalink
Browse files

Refactoring with some perf work

Creating a services interface for utility abstractions
Using a pool for baton memory buffers
  • Loading branch information...
1 parent 2a5a6ab commit 1f16bd5229cb9814b068d8770e63638c97439eed @loudej committed Jan 25, 2012
@@ -35,16 +35,20 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
+ <Compile Include="DragonflyServices.cs" />
<Compile Include="Http\Baton.cs" />
<Compile Include="Http\MessageBody.cs" />
+ <Compile Include="IDragonflyServices.cs" />
<Compile Include="Utils\Disposable.cs" />
<Compile Include="Http\Connection.cs" />
<Compile Include="Http\Frame.cs" />
<Compile Include="Http\ServerFactory.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utils\HeaderUtils.cs" />
+ <Compile Include="Utils\IMemoryPool.cs" />
<Compile Include="Utils\IServerTrace.cs" />
<Compile Include="Utils\ISocket.cs" />
+ <Compile Include="Utils\MemoryPool.cs" />
<Compile Include="Utils\NullServerTrace.cs" />
<Compile Include="Utils\SocketWrapper.cs" />
<Compile Include="Workaround.cs" />
@@ -0,0 +1,16 @@
+using Dragonfly.Utils;
+
+namespace Dragonfly
+{
+ public class DragonflyServices : IDragonflyServices
+ {
+ public DragonflyServices()
+ {
+ Trace = NullServerTrace.Instance;
+ Memory = new MemoryPool();
+ }
+
+ public IServerTrace Trace { get; set; }
+ public IMemoryPool Memory { get; set; }
+ }
+}
@@ -1,10 +1,19 @@
using System;
using System.Diagnostics;
+using Dragonfly.Utils;
namespace Dragonfly.Http
{
public class Baton
{
+ private readonly IMemoryPool _memory;
+
+ public Baton(IMemoryPool memory)
+ {
+ _memory = memory;
+ Buffer = new ArraySegment<byte>(_memory.Empty, 0, 0);
+ }
+
public ArraySegment<byte> Buffer { get; set; }
public bool RemoteIntakeFin { get; set; }
@@ -21,6 +30,15 @@ public ArraySegment<byte> Take(int count)
return taken;
}
+ public void Free()
+ {
+ if (Buffer.Count == 0 && Buffer.Array.Length != 0)
+ {
+ _memory.Free(Buffer.Array);
+ Buffer = new ArraySegment<byte>(_memory.Empty, 0, 0);
+ }
+ }
+
public ArraySegment<byte> Available(int minimumSize)
{
if (Buffer.Count == 0 && Buffer.Offset != 0)
@@ -33,13 +51,21 @@ public ArraySegment<byte> Available(int minimumSize)
if (availableSize + Buffer.Offset >= minimumSize)
{
Array.Copy(Buffer.Array, Buffer.Offset, Buffer.Array, 0, Buffer.Count);
- Buffer = new ArraySegment<byte>(Buffer.Array, 0, Buffer.Count);
+ if (Buffer.Count != 0)
+ {
+ Buffer = new ArraySegment<byte>(Buffer.Array, 0, Buffer.Count);
+ }
availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count;
}
else
{
- var larger = new ArraySegment<byte>(new byte[Buffer.Array.Length * 2 + minimumSize], 0, Buffer.Count);
- Array.Copy(Buffer.Array, Buffer.Offset, larger.Array, 0, Buffer.Count);
+ var largerSize = Buffer.Array.Length + Math.Max(Buffer.Array.Length, minimumSize);
+ var larger = new ArraySegment<byte>(_memory.Alloc(largerSize), 0, Buffer.Count);
+ if (Buffer.Count != 0)
+ {
+ Array.Copy(Buffer.Array, Buffer.Offset, larger.Array, 0, Buffer.Count);
+ }
+ _memory.Free(Buffer.Array);
Buffer = larger;
availableSize = Buffer.Array.Length - Buffer.Offset - Buffer.Count;
}
@@ -9,7 +9,7 @@ namespace Dragonfly.Http
{
public class Connection : IAsyncResult
{
- private readonly IServerTrace _trace;
+ private readonly IDragonflyServices _services;
private readonly AppDelegate _app;
private readonly ISocket _socket;
private readonly Action<ISocket> _disconnected;
@@ -21,30 +21,27 @@ public class Connection : IAsyncResult
private Action _frameConsumeCallback;
private SocketAsyncEventArgs _socketReceiveAsyncEventArgs;
- public Connection(IServerTrace trace, AppDelegate app, ISocket socket, Action<ISocket> disconnected)
+ public Connection(IDragonflyServices services, AppDelegate app, ISocket socket, Action<ISocket> disconnected)
{
- _trace = trace;
+ _services = services;
_app = app;
_socket = socket;
_disconnected = disconnected;
}
public void Execute()
{
- _trace.Event(TraceEventType.Start, TraceMessage.Connection);
+ _services.Trace.Event(TraceEventType.Start, TraceMessage.Connection);
- _baton = new Baton
- {
- Buffer = new ArraySegment<byte>(new byte[1024], 0, 0)
- };
+ _baton = new Baton(_services.Memory);
_fault = ex =>
{
Debug.WriteLine(ex.Message);
};
_socketReceiveAsyncEventArgs = new SocketAsyncEventArgs();
- _socketReceiveAsyncEventArgs.SetBuffer(new byte[0], 0, 0);
+ _socketReceiveAsyncEventArgs.SetBuffer(_services.Memory.Empty, 0, 0);
_socketReceiveAsyncEventArgs.Completed +=
(_, __) =>
{
@@ -112,6 +109,7 @@ private void Go(bool newFrame)
if (recvError == SocketError.WouldBlock)
{
+ _baton.Free();
if (_socket.ReceiveAsync(_socketReceiveAsyncEventArgs))
return;
@@ -145,6 +143,13 @@ struct SendInfo
private bool ProduceData(ArraySegment<byte> data, Action callback)
{
+ // Rogue value implies shutdown send (used for 1.0 clients)
+ if (data.Array == null)
+ {
+ _socket.Shutdown(SocketShutdown.Send);
+ return false;
+ }
+
if (callback == null)
{
DoProduce(data);
@@ -164,7 +169,7 @@ private void DoProduce(ArraySegment<byte> data)
var sent = _socket.Send(remaining.Array, remaining.Offset, remaining.Count, SocketFlags.None, out errorCode);
if (errorCode != SocketError.Success)
{
- _trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
+ _services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
break;
}
if (sent == remaining.Count)
@@ -196,7 +201,7 @@ private bool DoProduce(ArraySegment<byte> data, Action callback)
{
if (asyncInfo.SocketError != SocketError.Success)
{
- _trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
+ _services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
callback();
return;
}
@@ -222,7 +227,7 @@ private bool DoProduce(ArraySegment<byte> data, Action callback)
}
if (info.SocketError != SocketError.Success)
{
- _trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
+ _services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
break;
}
if (info.BytesSent == remaining.Count)
@@ -260,32 +265,39 @@ private SendInfo DoSend(ArraySegment<byte> data, Action<SendInfo> callback)
return new SendInfo { BytesSent = e.BytesTransferred, SocketError = e.SocketError };
}
- private void ProduceEnd(bool keepAlive)
+ private void ProduceEnd(ProduceEndType endType)
{
- if (keepAlive)
+ switch (endType)
{
- ThreadPool.QueueUserWorkItem(_ => Go(true));
- return;
- }
-
- _trace.Event(TraceEventType.Stop, TraceMessage.Connection);
-
- _socketReceiveAsyncEventArgs.Dispose();
- _socketReceiveAsyncEventArgs = null;
- _socket.Shutdown(SocketShutdown.Receive);
-
- var e = new SocketAsyncEventArgs();
- Action cleanup =
- () =>
- {
- e.Dispose();
- _disconnected(_socket);
- };
+ case ProduceEndType.SocketShutdownSend:
+ _socket.Shutdown(SocketShutdown.Send);
+ break;
+ case ProduceEndType.ConnectionKeepAlive:
+ ThreadPool.QueueUserWorkItem(_ => Go(true));
+ break;
+ case ProduceEndType.SocketDisconnect:
+ _services.Trace.Event(TraceEventType.Stop, TraceMessage.Connection);
+
+ _baton.Free();
+
+ _socketReceiveAsyncEventArgs.Dispose();
+ _socketReceiveAsyncEventArgs = null;
+ _socket.Shutdown(SocketShutdown.Receive);
+
+ var e = new SocketAsyncEventArgs();
+ Action cleanup =
+ () =>
+ {
+ e.Dispose();
+ _disconnected(_socket);
+ };
- e.Completed += (_, __) => cleanup();
- if (!_socket.DisconnectAsync(e))
- {
- cleanup();
+ e.Completed += (_, __) => cleanup();
+ if (!_socket.DisconnectAsync(e))
+ {
+ cleanup();
+ }
+ break;
}
}
@@ -8,11 +8,18 @@
namespace Dragonfly.Http
{
+ public enum ProduceEndType
+ {
+ SocketShutdownSend,
+ SocketDisconnect,
+ ConnectionKeepAlive,
+ }
+
public class Frame
{
private readonly AppDelegate _app;
private readonly Func<ArraySegment<byte>, Action, bool> _produceData;
- private readonly Action _produceEnd;
+ private readonly Action<ProduceEndType> _produceEnd;
Mode _mode;
enum Mode
@@ -33,15 +40,11 @@ enum Mode
private bool _resultStarted;
private bool _keepAlive;
- public Frame(AppDelegate app, Func<ArraySegment<byte>, Action, bool> produceData, Action<bool> produceEnd)
+ public Frame(AppDelegate app, Func<ArraySegment<byte>, Action, bool> produceData, Action<ProduceEndType> produceEnd)
{
_app = app;
_produceData = produceData;
- _produceEnd = () =>
- {
- if (!_messageBody.Drain(() => produceEnd(_keepAlive)))
- produceEnd(_keepAlive);
- };
+ _produceEnd = produceEnd;
}
public bool LocalIntakeFin
@@ -100,6 +103,7 @@ public bool Consume(Baton baton, Action callback, Action<Exception> fault)
});
_keepAlive = _messageBody.RequestKeepAlive;
_mode = Mode.MessageBody;
+ baton.Free();
Execute();
return true;
@@ -146,24 +150,36 @@ private void Execute()
{
_resultStarted = true;
- Action sendResponseBody =
- () => body(
- _produceData,
- ex => _produceEnd(),
- () => _produceEnd());
-
- if (!_produceData(CreateResponseHeader(status, headers), sendResponseBody))
- sendResponseBody.Invoke();
+ if (!_produceData(
+ CreateResponseHeader(status, headers),
+ () => body(_produceData, ProduceEnd, ProduceEnd)))
+ {
+ body(_produceData, ProduceEnd, ProduceEnd);
+ }
},
- ex => _produceEnd());
+ ProduceEnd);
return;
}
+ private void ProduceEnd(Exception ex)
+ {
+ ProduceEnd();
+ }
+
+ private void ProduceEnd()
+ {
+ if (!_keepAlive)
+ _produceEnd(ProduceEndType.SocketShutdownSend);
+
+ if (!_messageBody.Drain(() => _produceEnd(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect)))
+ _produceEnd(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect);
+ }
+
private ArraySegment<byte> CreateResponseHeader(string status, IEnumerable<KeyValuePair<string, IEnumerable<string>>> headers)
{
var sb = new StringBuilder(128);
- sb.Append("HTTP/1.1 ").AppendLine(status);
+ sb.Append(_httpVersion).Append(" ").AppendLine(status);
var hasConnection = false;
var hasTransferEncoding = false;
Oops, something went wrong.

0 comments on commit 1f16bd5

Please sign in to comment.