Skip to content

Commit

Permalink
Adding a SocketSender buffering layer
Browse files Browse the repository at this point in the history
Seperating the concern of buffering data and switching between immediate
and async sending modes
  • Loading branch information
loudej committed Feb 12, 2012
1 parent 75791ec commit 36ccca5
Show file tree
Hide file tree
Showing 22 changed files with 894 additions and 190 deletions.
13 changes: 13 additions & 0 deletions Firefly.sln
Expand Up @@ -33,6 +33,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{87AEA17F
packages.config = packages.config
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Profile.SystemBehaviors", "src\test\Profile.SystemBehaviors\Profile.SystemBehaviors.csproj", "{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -103,6 +105,16 @@ Global
{63258335-EC44-4497-9016-167BDB8D0A53}.Release|Mixed Platforms.Build.0 = Release|x86
{63258335-EC44-4497-9016-167BDB8D0A53}.Release|x86.ActiveCfg = Release|x86
{63258335-EC44-4497-9016-167BDB8D0A53}.Release|x86.Build.0 = Release|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Debug|Any CPU.ActiveCfg = Debug|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Debug|Mixed Platforms.Build.0 = Debug|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Debug|x86.ActiveCfg = Debug|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Debug|x86.Build.0 = Debug|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Release|Any CPU.ActiveCfg = Release|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Release|Mixed Platforms.ActiveCfg = Release|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Release|Mixed Platforms.Build.0 = Release|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Release|x86.ActiveCfg = Release|x86
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -112,6 +124,7 @@ Global
{2087B5A3-5A02-4F6C-AB54-BDB96D32C291} = {0EFCCCA4-4234-4A7A-8B6A-F4BDD12C226D}
{1332C39F-C2F1-4431-AF9E-1315C322E51A} = {FD1D2A43-12D2-42B7-8640-1CBE8B094A77}
{63258335-EC44-4497-9016-167BDB8D0A53} = {FD1D2A43-12D2-42B7-8640-1CBE8B094A77}
{D09DCA54-3534-4E35-8761-F8DA1FF7FBC9} = {FD1D2A43-12D2-42B7-8640-1CBE8B094A77}
{87AEA17F-9DF3-4702-ABFC-9FD3FE5562DD} = {AF2D9CC0-CBC1-47F3-847E-4ED35F139FAE}
EndGlobalSection
EndGlobal
1 change: 1 addition & 0 deletions src/main/Firefly/Firefly.csproj
Expand Up @@ -58,6 +58,7 @@
<Compile Include="Utils\MemoryPoolTextWriter.cs" />
<Compile Include="Utils\NullServerTrace.cs" />
<Compile Include="Utils\SocketWrapper.cs" />
<Compile Include="Utils\SocketSender.cs" />
<Compile Include="Workaround.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
202 changes: 42 additions & 160 deletions src/main/Firefly/Http/Connection.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;
Expand All @@ -12,6 +13,7 @@ public class Connection : IAsyncResult
private readonly IFireflyService _services;
private readonly AppDelegate _app;
private readonly ISocket _socket;
private ISocketSender _socketSender;
private readonly Action<ISocket> _disconnected;

private Baton _baton;
Expand All @@ -26,6 +28,7 @@ public Connection(IFireflyService services, AppDelegate app, ISocket socket, Act
_services = services;
_app = app;
_socket = socket;
_socketSender = new SocketSender(_services, _socket);
_disconnected = disconnected;
}

Expand Down Expand Up @@ -70,6 +73,7 @@ public void Execute()
try
{
_socket.Blocking = false;
_socket.NoDelay = true;
Go(true);
}
catch (Exception ex)
Expand All @@ -88,8 +92,8 @@ private void Go(bool newFrame)
{
Services = _services,
App = _app,
Write = data => ProduceData(data, null),
Flush = _ => false,
Write = _socketSender.Write,
Flush = _socketSender.Flush,
End = ProduceEnd
});

Expand Down Expand Up @@ -144,170 +148,48 @@ private void Go(bool newFrame)
}
}

struct SendInfo
{
public int BytesSent { get; set; }
public SocketError SocketError { get; set; }
}

private bool ProduceData(ArraySegment<byte> data, Action callback)
{
// Rogue value implies shutdown send (used for 1.0 clients)
if (data.Array == null)
{
_socket.Shutdown(SocketShutdown.Send);
return false;
}

if (callback == null)
{
DoProduce(data);
return false;
}

return DoProduce(data, callback);
}

private void DoProduce(ArraySegment<byte> data)
{
var remaining = data;

while (remaining.Count != 0)
{
SocketError errorCode;
var sent = _socket.Send(remaining.Array, remaining.Offset, remaining.Count, SocketFlags.None, out errorCode);
if (errorCode != SocketError.Success)
{
_services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
break;
}
if (sent == remaining.Count)
{
break;
}

remaining = new ArraySegment<byte>(
remaining.Array,
remaining.Offset + sent,
remaining.Count - sent);

// BLOCK - enters a wait state for sync output waiting for kernel buffer to be writable
//Socket.Select(null, new List<Socket> { _socket }, null, -1);
_socket.WaitToSend();
}
}

// ReSharper disable AccessToModifiedClosure
private bool DoProduce(ArraySegment<byte> data, Action callback)
private void ProduceEnd(ProduceEndType endType)
{
var remaining = data;

while (remaining.Count != 0)
{
var info = DoSend(
remaining,
asyncInfo =>
Action drained =
() =>
{
if (asyncInfo.SocketError != SocketError.Success)
{
_services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
callback();
return;
}
if (asyncInfo.BytesSent == remaining.Count)
switch (endType)
{
callback();
return;
case ProduceEndType.SocketShutdownSend:
_socket.Shutdown(SocketShutdown.Send);
break;
case ProduceEndType.ConnectionKeepAlive:
ThreadPool.QueueUserWorkItem(_ => Go(true));
break;
case ProduceEndType.SocketDisconnect:
_services.Trace.Event(TraceEventType.Stop, TraceMessage.Connection);
_baton.Free();
_socketReceiveAsyncEventArgs.Dispose();
_socketReceiveAsyncEventArgs = null;
_socket.Shutdown(SocketShutdown.Receive);
var e = new SocketAsyncEventArgs();
Action cleanup =
() =>
{
e.Dispose();
_disconnected(_socket);
};
e.Completed += (_, __) => cleanup();
if (!_socket.DisconnectAsync(e))
{
cleanup();
}
break;
}
};

if (!DoProduce(
new ArraySegment<byte>(
remaining.Array,
remaining.Offset + asyncInfo.BytesSent,
remaining.Count - asyncInfo.BytesSent),
callback))
{
callback();
}
});
if (info.SocketError == SocketError.IOPending)
{
return true;
}
if (info.SocketError != SocketError.Success)
{
_services.Trace.Event(TraceEventType.Warning, TraceMessage.ConnectionSendSocketError);
break;
}
if (info.BytesSent == remaining.Count)
{
break;
}

remaining = new ArraySegment<byte>(
remaining.Array,
remaining.Offset + info.BytesSent,
remaining.Count - info.BytesSent);
}
return false;
}
// ReSharper restore AccessToModifiedClosure

private SendInfo DoSend(ArraySegment<byte> data, Action<SendInfo> callback)
{
var e = new SocketAsyncEventArgs();
e.Completed +=
(_, __) =>
{
e.Dispose();
callback(new SendInfo { BytesSent = e.BytesTransferred, SocketError = e.SocketError });
};
e.SetBuffer(data.Array, data.Offset, data.Count);
var delayed = _socket.SendAsync(e);

if (delayed)
{
return new SendInfo { SocketError = SocketError.IOPending };
}

e.Dispose();
return new SendInfo { BytesSent = e.BytesTransferred, SocketError = e.SocketError };
}

private void ProduceEnd(ProduceEndType endType)
{
switch (endType)
{
case ProduceEndType.SocketShutdownSend:
_socket.Shutdown(SocketShutdown.Send);
break;
case ProduceEndType.ConnectionKeepAlive:
ThreadPool.QueueUserWorkItem(_ => Go(true));
break;
case ProduceEndType.SocketDisconnect:
_services.Trace.Event(TraceEventType.Stop, TraceMessage.Connection);

_baton.Free();

_socketReceiveAsyncEventArgs.Dispose();
_socketReceiveAsyncEventArgs = null;
_socket.Shutdown(SocketShutdown.Receive);

var e = new SocketAsyncEventArgs();
Action cleanup =
() =>
{
e.Dispose();
_disconnected(_socket);
};

e.Completed += (_, __) => cleanup();
if (!_socket.DisconnectAsync(e))
{
cleanup();
}
break;
}
if (!_socketSender.Flush(drained))
drained.Invoke();
}


Expand Down
21 changes: 20 additions & 1 deletion src/main/Firefly/Utils/IMemoryPool.cs
@@ -1,4 +1,6 @@
namespace Firefly.Utils
using System;

namespace Firefly.Utils
{
public interface IMemoryPool
{
Expand All @@ -9,5 +11,22 @@ public interface IMemoryPool

char[] AllocChar(int minimumSize);
void FreeChar(char[] memory);

/// <summary>
/// Acquires a sub-segment of a larger memory allocation. Used for async sends of write-behind
/// buffers to reduce number of array segments pinned
/// </summary>
/// <param name="minimumSize">The smallest length of the ArraySegment.Count that may be returned</param>
/// <returns>An array segment which is a sub-block of a larger allocation</returns>
ArraySegment<byte> AllocSegment(int minimumSize);

/// <summary>
/// Frees a sub-segment of a larger memory allocation produced by AllocSegment. The original ArraySegment
/// must be frees exactly once and must have the same offset and count that was returned by the Alloc.
/// If a segment is not freed it won't be re-used and has the same effect as a memory leak, so callers must be
/// implemented exactly correctly.
/// </summary>
/// <param name="segment">The sub-block that was originally returned by a call to AllocSegment.</param>
void FreeSegment(ArraySegment<byte> segment);
}
}
7 changes: 4 additions & 3 deletions src/main/Firefly/Utils/ISocket.cs
@@ -1,23 +1,24 @@
using System;
using System.Collections.Generic;
using System.Net.Sockets;

namespace Firefly.Utils
{
public interface ISocket
{
bool Blocking { get; set; }
bool NoDelay { get; set; }
bool Connected { get; }

int Receive(byte[] buffer, int offset, int size, SocketFlags socketFlags, out SocketError errorCode);
bool ReceiveAsync(SocketAsyncEventArgs e);

int Send(byte[] buffer, int offset, int size, SocketFlags socketFlags, out SocketError errorCode);
int Send(IList<ArraySegment<byte>> buffers, SocketFlags socketFlags, out SocketError errorCode);
bool SendAsync(SocketAsyncEventArgs e);

void WaitToSend();

void Shutdown(SocketShutdown how);
bool DisconnectAsync(SocketAsyncEventArgs e);
void Close();
}

}
11 changes: 11 additions & 0 deletions src/main/Firefly/Utils/MemoryPool.cs
Expand Up @@ -97,5 +97,16 @@ public void FreeChar(char[] memory)
break;
}
}

public ArraySegment<byte> AllocSegment(int minimumSize)
{
return new ArraySegment<byte>(AllocByte(minimumSize));
}

public void FreeSegment(ArraySegment<byte> segment)
{
FreeByte(segment.Array);
}

}
}

0 comments on commit 36ccca5

Please sign in to comment.