Skip to content

Commit

Permalink
Refactor ExecuteWithTimeout to avoid closure.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgoz committed Nov 12, 2012
1 parent 5eba5ff commit 39b4a12
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 36 deletions.
96 changes: 96 additions & 0 deletions src/ZeroMQ/ExecuteExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
namespace ZeroMQ
{
using System;
using System.Diagnostics;
using System.Threading;

// TODO: Refactor and use compiled expressions?
internal static class ExecuteExtensions
{
public delegate TResult ThirdParamOut<in T1, in T2, T3, out TResult>(T1 arg1, T2 arg2, out T3 arg3);

public static TResult WithTimeout<T1, T2, TResult>(this ZmqSocket socket, Func<T1, T2, TResult> method, T1 arg1, T2 arg2, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
return method(arg1, arg2);
}

TResult receiveResult;

var timer = Stopwatch.StartNew();
var spin = new SpinWait();

do
{
receiveResult = method(arg1, arg2);

if (socket.ReceiveStatus != ReceiveStatus.TryAgain)
{
break;
}

spin.SpinOnce();
}
while (timer.Elapsed <= timeout);

return receiveResult;
}

public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, Func<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, T3 arg3, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
return method(arg1, arg2, arg3);
}

TResult receiveResult;

var timer = Stopwatch.StartNew();
var spin = new SpinWait();

do
{
receiveResult = method(arg1, arg2, arg3);

if (socket.ReceiveStatus != ReceiveStatus.TryAgain)
{
break;
}

spin.SpinOnce();
}
while (timer.Elapsed <= timeout);

return receiveResult;
}

public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, ThirdParamOut<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, out T3 arg3, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
return method(arg1, arg2, out arg3);
}

TResult receiveResult;

var timer = Stopwatch.StartNew();
var spin = new SpinWait();

do
{
receiveResult = method(arg1, arg2, out arg3);

if (socket.ReceiveStatus != ReceiveStatus.TryAgain)
{
break;
}

spin.SpinOnce();
}
while (timer.Elapsed <= timeout);

return receiveResult;
}
}
}
1 change: 1 addition & 0 deletions src/ZeroMQ/ZeroMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<Compile Include="Devices\ZmqDeviceException.cs" />
<Compile Include="DuplexSocket.cs" />
<Compile Include="Devices\IDevice.cs" />
<Compile Include="ExecuteExtensions.cs" />
<Compile Include="Interop\ErrorDetails.cs" />
<Compile Include="Interop\MonitorEventData.cs" />
<Compile Include="Interop\LibZmq.Mono.cs" />
Expand Down
41 changes: 5 additions & 36 deletions src/ZeroMQ/ZmqSocket.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
namespace ZeroMQ
{
using System;
using System.Diagnostics;
using System.Threading;

using Interop;

/// <summary>
Expand Down Expand Up @@ -553,7 +550,7 @@ public int Receive(byte[] buffer, TimeSpan timeout)
{
return timeout == TimeSpan.MaxValue
? Receive(buffer)
: ExecuteWithTimeout(() => Receive(buffer, SocketFlags.DontWait), timeout);
: this.WithTimeout(Receive, buffer, SocketFlags.DontWait, timeout);
}

/// <summary>
Expand Down Expand Up @@ -650,9 +647,9 @@ public byte[] Receive(byte[] buffer, TimeSpan timeout, out int size)
return Receive(buffer, out size);
}

int receivedBytes = -1;
int receivedBytes;
byte[] message = this.WithTimeout(Receive, buffer, SocketFlags.DontWait, out receivedBytes, timeout);

byte[] message = ExecuteWithTimeout(() => Receive(buffer, SocketFlags.DontWait, out receivedBytes), timeout);
size = receivedBytes;

return message;
Expand Down Expand Up @@ -784,8 +781,8 @@ public virtual int Send(byte[] buffer, int size, SocketFlags flags)
public int Send(byte[] buffer, int size, SocketFlags flags, TimeSpan timeout)
{
return timeout == TimeSpan.MaxValue
? Send(buffer, size, flags & ~SocketFlags.DontWait)
: ExecuteWithTimeout(() => Send(buffer, size, flags | SocketFlags.DontWait), timeout);
? Send(buffer, size, flags & ~SocketFlags.DontWait)
: this.WithTimeout(Send, buffer, size, flags | SocketFlags.DontWait, timeout);
}

/// <summary>
Expand Down Expand Up @@ -1081,34 +1078,6 @@ private void SetLegacySocketOption<TLegacy>(SocketOption option, int value, TLeg
}
}

private TResult ExecuteWithTimeout<TResult>(Func<TResult> method, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
return method();
}

TResult receiveResult;

var timer = Stopwatch.StartNew();
var spin = new SpinWait();

do
{
receiveResult = method();

if (ReceiveStatus != ReceiveStatus.TryAgain)
{
break;
}

spin.SpinOnce();
}
while (timer.Elapsed <= timeout);

return receiveResult;
}

private void InvokeReceiveReady(PollEvents readyEvents)
{
EventHandler<SocketEventArgs> handler = ReceiveReady;
Expand Down

0 comments on commit 39b4a12

Please sign in to comment.